This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e1fe395d [flink] Support updating row type nested in array/map in 
Flink (#4528)
7e1fe395d is described below

commit 7e1fe395df0831a9d65bf4b6126562252a7a7bf2
Author: tsreaper <[email protected]>
AuthorDate: Thu Nov 14 18:36:02 2024 +0800

    [flink] Support updating row type nested in array/map in Flink (#4528)
---
 ...CastFieldGetter.java => CastElementGetter.java} |  22 ++-
 .../org/apache/paimon/casting/CastFieldGetter.java |   3 +
 .../org/apache/paimon/casting/CastedArray.java     | 201 +++++++++++++++++++++
 .../java/org/apache/paimon/casting/CastedMap.java  |  70 +++++++
 .../java/org/apache/paimon/casting/CastedRow.java  |   2 -
 .../apache/paimon/schema/SchemaEvolutionUtil.java  |  99 ++++++----
 .../org/apache/paimon/schema/SchemaManager.java    |  44 ++++-
 .../apache/paimon/schema/SchemaManagerTest.java    |  50 ++++-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  41 ++++-
 .../apache/paimon/flink/SchemaChangeITCase.java    |  66 ++++++-
 10 files changed, 533 insertions(+), 65 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
similarity index 64%
copy from 
paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
copy to 
paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
index 02168300a..b8a91f572 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastElementGetter.java
@@ -18,22 +18,24 @@
 
 package org.apache.paimon.casting;
 
-import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalArray;
 
-/**
- * Get field value from row with given pos and cast it according to specific 
{@link CastExecutor}.
- */
-public class CastFieldGetter {
-    private final InternalRow.FieldGetter fieldGetter;
+/** Get element from array and cast it according to specific {@link 
CastExecutor}. */
+public class CastElementGetter {
+
+    private final InternalArray.ElementGetter elementGetter;
     private final CastExecutor<Object, Object> castExecutor;
 
-    public CastFieldGetter(InternalRow.FieldGetter fieldGetter, 
CastExecutor<?, ?> castExecutor) {
-        this.fieldGetter = fieldGetter;
+    @SuppressWarnings("unchecked")
+    public CastElementGetter(
+            InternalArray.ElementGetter elementGetter, CastExecutor<?, ?> 
castExecutor) {
+        this.elementGetter = elementGetter;
         this.castExecutor = (CastExecutor<Object, Object>) castExecutor;
     }
 
-    public <V> V getFieldOrNull(InternalRow row) {
-        Object value = fieldGetter.getFieldOrNull(row);
+    @SuppressWarnings("unchecked")
+    public <V> V getElementOrNull(InternalArray array, int pos) {
+        Object value = elementGetter.getElementOrNull(array, pos);
         return value == null ? null : (V) castExecutor.cast(value);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
index 02168300a..208ef5f30 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastFieldGetter.java
@@ -24,14 +24,17 @@ import org.apache.paimon.data.InternalRow;
  * Get field value from row with given pos and cast it according to specific 
{@link CastExecutor}.
  */
 public class CastFieldGetter {
+
     private final InternalRow.FieldGetter fieldGetter;
     private final CastExecutor<Object, Object> castExecutor;
 
+    @SuppressWarnings("unchecked")
     public CastFieldGetter(InternalRow.FieldGetter fieldGetter, 
CastExecutor<?, ?> castExecutor) {
         this.fieldGetter = fieldGetter;
         this.castExecutor = (CastExecutor<Object, Object>) castExecutor;
     }
 
+    @SuppressWarnings("unchecked")
     public <V> V getFieldOrNull(InternalRow row) {
         Object value = fieldGetter.getFieldOrNull(row);
         return value == null ? null : (V) castExecutor.cast(value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
new file mode 100644
index 000000000..778b11d1f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+
+/**
+ * An implementation of {@link InternalArray} which provides a casted view of 
the underlying {@link
+ * InternalArray}.
+ *
+ * <p>It reads data from underlying {@link InternalArray} according to source 
logical type and casts
+ * it with specific {@link CastExecutor}.
+ */
+public class CastedArray implements InternalArray {
+
+    private final CastElementGetter castElementGetter;
+    private InternalArray array;
+
+    protected CastedArray(CastElementGetter castElementGetter) {
+        this.castElementGetter = castElementGetter;
+    }
+
+    /**
+     * Replaces the underlying {@link InternalArray} backing this {@link 
CastedArray}.
+     *
+     * <p>This method replaces the array in place and does not return a new 
object. This is done for
+     * performance reasons.
+     */
+    public static CastedArray from(CastElementGetter castElementGetter) {
+        return new CastedArray(castElementGetter);
+    }
+
+    public CastedArray replaceArray(InternalArray array) {
+        this.array = array;
+        return this;
+    }
+
+    @Override
+    public int size() {
+        return array.size();
+    }
+
+    @Override
+    public boolean[] toBooleanArray() {
+        boolean[] result = new boolean[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        byte[] result = new byte[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public short[] toShortArray() {
+        short[] result = new short[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public int[] toIntArray() {
+        int[] result = new int[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long[] result = new long[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public float[] toFloatArray() {
+        float[] result = new float[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public double[] toDoubleArray() {
+        double[] result = new double[size()];
+        for (int i = 0; i < result.length; i++) {
+            result[i] = castElementGetter.getElementOrNull(array, i);
+        }
+        return result;
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return castElementGetter.getElementOrNull(array, pos) == null;
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return castElementGetter.getElementOrNull(array, pos);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java
new file mode 100644
index 000000000..4068407ca
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedMap.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+
+/**
+ * An implementation of {@link InternalMap} which provides a casted view of 
the underlying {@link
+ * InternalMap}.
+ *
+ * <p>It reads data from underlying {@link InternalMap} according to source 
logical type and casts
+ * it with specific {@link CastExecutor}.
+ */
+public class CastedMap implements InternalMap {
+
+    private final CastedArray castedValueArray;
+    private InternalMap map;
+
+    protected CastedMap(CastElementGetter castValueGetter) {
+        this.castedValueArray = CastedArray.from(castValueGetter);
+    }
+
+    /**
+     * Replaces the underlying {@link InternalMap} backing this {@link 
CastedMap}.
+     *
+     * <p>This method replaces the map in place and does not return a new 
object. This is done for
+     * performance reasons.
+     */
+    public static CastedMap from(CastElementGetter castValueGetter) {
+        return new CastedMap(castValueGetter);
+    }
+
+    public CastedMap replaceMap(InternalMap map) {
+        this.castedValueArray.replaceArray(map.valueArray());
+        this.map = map;
+        return this;
+    }
+
+    @Override
+    public int size() {
+        return map.size();
+    }
+
+    @Override
+    public InternalArray keyArray() {
+        return map.keyArray();
+    }
+
+    @Override
+    public InternalArray valueArray() {
+        return castedValueArray;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
index 25c574425..f9216d10b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java
@@ -34,8 +34,6 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  *
  * <p>It reads data from underlying {@link InternalRow} according to source 
logical type and casts
  * it with specific {@link CastExecutor}.
- *
- * <p>Note: This class supports only top-level castings, not nested castings.
  */
 public class CastedRow implements InternalRow {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index b5d730707..0ae2798c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -19,10 +19,15 @@
 package org.apache.paimon.schema;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.casting.CastElementGetter;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.casting.CastFieldGetter;
+import org.apache.paimon.casting.CastedArray;
+import org.apache.paimon.casting.CastedMap;
 import org.apache.paimon.casting.CastedRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
@@ -31,7 +36,6 @@ import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowUtils;
 import org.apache.paimon.utils.ProjectedRow;
@@ -372,6 +376,7 @@ public class SchemaEvolutionUtil {
             List<DataField> tableFields, List<DataField> dataFields, int[] 
indexMapping) {
         CastFieldGetter[] converterMapping = new 
CastFieldGetter[tableFields.size()];
         boolean castExist = false;
+
         for (int i = 0; i < tableFields.size(); i++) {
             int dataIndex = indexMapping == null ? i : indexMapping[i];
             if (dataIndex < 0) {
@@ -380,53 +385,39 @@ public class SchemaEvolutionUtil {
             } else {
                 DataField tableField = tableFields.get(i);
                 DataField dataField = dataFields.get(dataIndex);
-                if (dataField.type().equalsIgnoreNullable(tableField.type())) {
-                    // Create getter with index i and projected row data will 
convert to underlying
-                    // data
-                    converterMapping[i] =
-                            new CastFieldGetter(
-                                    
InternalRowUtils.createNullCheckingFieldGetter(
-                                            dataField.type(), i),
-                                    CastExecutors.identityCastExecutor());
-                } else {
-                    // TODO support column type evolution in nested type
-                    checkState(
-                            !(tableField.type() instanceof MapType
-                                    || dataField.type() instanceof ArrayType
-                                    || dataField.type() instanceof 
MultisetType),
-                            "Only support column type evolution in atomic and 
row data type.");
-
-                    CastExecutor<?, ?> castExecutor;
-                    if (tableField.type() instanceof RowType
-                            && dataField.type() instanceof RowType) {
-                        castExecutor =
-                                createRowCastExecutor(
-                                        (RowType) dataField.type(), (RowType) 
tableField.type());
-                    } else {
-                        castExecutor = CastExecutors.resolve(dataField.type(), 
tableField.type());
-                    }
-                    checkNotNull(
-                            castExecutor,
-                            "Cannot cast from type "
-                                    + dataField.type()
-                                    + " to type "
-                                    + tableField.type());
-
-                    // Create getter with index i and projected row data will 
convert to underlying
-                    // data
-                    converterMapping[i] =
-                            new CastFieldGetter(
-                                    
InternalRowUtils.createNullCheckingFieldGetter(
-                                            dataField.type(), i),
-                                    castExecutor);
+                if (!dataField.type().equalsIgnoreNullable(tableField.type())) 
{
                     castExist = true;
                 }
+
+                // Create getter with index i and projected row data will 
convert to underlying data
+                converterMapping[i] =
+                        new CastFieldGetter(
+                                
InternalRowUtils.createNullCheckingFieldGetter(dataField.type(), i),
+                                createCastExecutor(dataField.type(), 
tableField.type()));
             }
         }
 
         return castExist ? converterMapping : null;
     }
 
+    private static CastExecutor<?, ?> createCastExecutor(DataType inputType, 
DataType targetType) {
+        if (targetType.equalsIgnoreNullable(inputType)) {
+            return CastExecutors.identityCastExecutor();
+        } else if (inputType instanceof RowType && targetType instanceof 
RowType) {
+            return createRowCastExecutor((RowType) inputType, (RowType) 
targetType);
+        } else if (inputType instanceof ArrayType && targetType instanceof 
ArrayType) {
+            return createArrayCastExecutor((ArrayType) inputType, (ArrayType) 
targetType);
+        } else if (inputType instanceof MapType && targetType instanceof 
MapType) {
+            return createMapCastExecutor((MapType) inputType, (MapType) 
targetType);
+        } else {
+            return checkNotNull(
+                    CastExecutors.resolve(inputType, targetType),
+                    "Cannot cast from type %s to type %s",
+                    inputType,
+                    targetType);
+        }
+    }
+
     private static CastExecutor<InternalRow, InternalRow> 
createRowCastExecutor(
             RowType inputType, RowType targetType) {
         int[] indexMapping = createIndexMapping(targetType.getFields(), 
inputType.getFields());
@@ -446,4 +437,32 @@ public class SchemaEvolutionUtil {
             return value;
         };
     }
+
+    private static CastExecutor<InternalArray, InternalArray> 
createArrayCastExecutor(
+            ArrayType inputType, ArrayType targetType) {
+        CastElementGetter castElementGetter =
+                new CastElementGetter(
+                        
InternalArray.createElementGetter(inputType.getElementType()),
+                        createCastExecutor(
+                                inputType.getElementType(), 
targetType.getElementType()));
+
+        CastedArray castedArray = CastedArray.from(castElementGetter);
+        return castedArray::replaceArray;
+    }
+
+    private static CastExecutor<InternalMap, InternalMap> 
createMapCastExecutor(
+            MapType inputType, MapType targetType) {
+        checkState(
+                inputType.getKeyType().equals(targetType.getKeyType()),
+                "Cannot cast map type %s to map type %s, because they have 
different key types.",
+                inputType.getKeyType(),
+                targetType.getKeyType());
+        CastElementGetter castElementGetter =
+                new CastElementGetter(
+                        
InternalArray.createElementGetter(inputType.getValueType()),
+                        createCastExecutor(inputType.getValueType(), 
targetType.getValueType()));
+
+        CastedMap castedMap = CastedMap.from(castElementGetter);
+        return castedMap::replaceMap;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 86e365a88..a84348810 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -37,9 +37,11 @@ import 
org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
 import org.apache.paimon.schema.SchemaChange.UpdateComment;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
@@ -636,17 +638,17 @@ public class SchemaManager implements Serializable {
                     continue;
                 }
 
+                String fullFieldName =
+                        String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1));
                 List<DataField> nestedFields =
-                        new ArrayList<>(
-                                ((org.apache.paimon.types.RowType) 
field.type()).getFields());
+                        new ArrayList<>(extractRowType(field.type(), 
fullFieldName).getFields());
                 updateIntermediateColumn(nestedFields, depth + 1);
                 newFields.set(
                         i,
                         new DataField(
                                 field.id(),
                                 field.name(),
-                                new org.apache.paimon.types.RowType(
-                                        field.type().isNullable(), 
nestedFields),
+                                wrapNewRowType(field.type(), nestedFields),
                                 field.description()));
                 return;
             }
@@ -656,6 +658,40 @@ public class SchemaManager implements Serializable {
                     String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
         }
 
+        private RowType extractRowType(DataType type, String fullFieldName) {
+            switch (type.getTypeRoot()) {
+                case ROW:
+                    return (RowType) type;
+                case ARRAY:
+                    return extractRowType(((ArrayType) type).getElementType(), 
fullFieldName);
+                case MAP:
+                    return extractRowType(((MapType) type).getValueType(), 
fullFieldName);
+                default:
+                    throw new IllegalArgumentException(
+                            fullFieldName + " is not a structured type.");
+            }
+        }
+
+        private DataType wrapNewRowType(DataType type, List<DataField> 
nestedFields) {
+            switch (type.getTypeRoot()) {
+                case ROW:
+                    return new RowType(type.isNullable(), nestedFields);
+                case ARRAY:
+                    return new ArrayType(
+                            type.isNullable(),
+                            wrapNewRowType(((ArrayType) 
type).getElementType(), nestedFields));
+                case MAP:
+                    MapType mapType = (MapType) type;
+                    return new MapType(
+                            type.isNullable(),
+                            mapType.getKeyType(),
+                            wrapNewRowType(mapType.getValueType(), 
nestedFields));
+                default:
+                    throw new IllegalStateException(
+                            "Trying to wrap a row type in " + type + ". This 
is unexpected.");
+            }
+        }
+
         protected abstract void updateLastColumn(List<DataField> newFields, 
String fieldName)
                 throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException;
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 088cb72f9..f0d654369 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -690,7 +691,7 @@ public class SchemaManagerTest {
 
         SchemaChange updateColumnType =
                 SchemaChange.updateColumnType(
-                        new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(), 
true);
+                        new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(), 
false);
         manager.commitChanges(updateColumnType);
 
         innerType =
@@ -708,8 +709,53 @@ public class SchemaManagerTest {
 
         SchemaChange middleColumnNotExistUpdateColumnType =
                 SchemaChange.updateColumnType(
-                        new String[] {"v", "invalid", "f1"}, 
DataTypes.BIGINT(), true);
+                        new String[] {"v", "invalid", "f1"}, 
DataTypes.BIGINT(), false);
         assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistUpdateColumnType))
                 .hasMessageContaining("Column v.invalid does not exist");
     }
+
+    @Test
+    public void testUpdateRowTypeInArrayAndMap() throws Exception {
+        RowType innerType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.INT()),
+                        new DataField(3, "f2", DataTypes.BIGINT()));
+        RowType outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()),
+                        new DataField(
+                                1, "v", new ArrayType(new 
MapType(DataTypes.INT(), innerType))));
+
+        Schema schema =
+                new Schema(
+                        outerType.getFields(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+        manager.createTable(schema);
+
+        SchemaChange addColumn =
+                SchemaChange.addColumn(
+                        new String[] {"v", "f3"},
+                        DataTypes.STRING(),
+                        null,
+                        SchemaChange.Move.first("f3"));
+        SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v", 
"f2"});
+        SchemaChange updateColumnType =
+                SchemaChange.updateColumnType(new String[] {"v", "f1"}, 
DataTypes.BIGINT(), false);
+        manager.commitChanges(addColumn, dropColumn, updateColumnType);
+
+        innerType =
+                RowType.of(
+                        new DataField(4, "f3", DataTypes.STRING()),
+                        new DataField(2, "f1", DataTypes.BIGINT()));
+        outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()),
+                        new DataField(
+                                1, "v", new ArrayType(new 
MapType(DataTypes.INT(), innerType))));
+        
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ae30fa569..09fc0328e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -673,14 +673,13 @@ public class FlinkCatalog extends AbstractCatalog {
             org.apache.paimon.types.DataType oldType,
             org.apache.paimon.types.DataType newType,
             List<SchemaChange> schemaChanges) {
+        String joinedNames = String.join(".", fieldNames);
         if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
             Preconditions.checkArgument(
                     newType.getTypeRoot() == DataTypeRoot.ROW,
-                    "Column "
-                            + String.join(".", fieldNames)
-                            + " can only be updated to row type, and cannot be 
updated to "
-                            + newType
-                            + " type");
+                    "Column %s can only be updated to row type, and cannot be 
updated to %s type",
+                    joinedNames,
+                    newType.getTypeRoot());
             org.apache.paimon.types.RowType oldRowType = 
(org.apache.paimon.types.RowType) oldType;
             org.apache.paimon.types.RowType newRowType = 
(org.apache.paimon.types.RowType) newType;
 
@@ -699,7 +698,7 @@ public class FlinkCatalog extends AbstractCatalog {
                             lastIdx < idx,
                             "Order of existing fields in column %s must be 
kept the same. "
                                     + "However, field %s and %s have changed 
their orders.",
-                            String.join(".", fieldNames),
+                            joinedNames,
                             lastFieldName,
                             name);
                     lastIdx = idx;
@@ -751,6 +750,36 @@ public class FlinkCatalog extends AbstractCatalog {
                             fullFieldNames, oldField.type(), field.type(), 
schemaChanges);
                 }
             }
+        } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
+            Preconditions.checkArgument(
+                    newType.getTypeRoot() == DataTypeRoot.ARRAY,
+                    "Column %s can only be updated to array type, and cannot 
be updated to %s type",
+                    joinedNames,
+                    newType);
+            generateNestedColumnUpdates(
+                    fieldNames,
+                    ((org.apache.paimon.types.ArrayType) 
oldType).getElementType(),
+                    ((org.apache.paimon.types.ArrayType) 
newType).getElementType(),
+                    schemaChanges);
+        } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
+            Preconditions.checkArgument(
+                    newType.getTypeRoot() == DataTypeRoot.MAP,
+                    "Column %s can only be updated to map type, and cannot be 
updated to %s type",
+                    joinedNames,
+                    newType);
+            org.apache.paimon.types.MapType oldMapType = 
(org.apache.paimon.types.MapType) oldType;
+            org.apache.paimon.types.MapType newMapType = 
(org.apache.paimon.types.MapType) newType;
+            Preconditions.checkArgument(
+                    oldMapType.getKeyType().equals(newMapType.getKeyType()),
+                    "Cannot update key type of column %s from %s type to %s 
type",
+                    joinedNames,
+                    oldMapType.getKeyType(),
+                    newMapType.getKeyType());
+            generateNestedColumnUpdates(
+                    fieldNames,
+                    oldMapType.getValueType(),
+                    newMapType.getValueType(),
+                    schemaChanges);
         } else {
             if (!oldType.equalsIgnoreNullable(newType)) {
                 schemaChanges.add(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index a2ef1d5c8..a8e833215 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -1133,6 +1134,69 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
 
         assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 
INT, f3 STRING))"))
                 .hasRootCauseMessage(
-                        "Column v.f2 can only be updated to row type, and 
cannot be updated to INT type");
+                        "Column v.f2 can only be updated to row type, and 
cannot be updated to INTEGER type");
+    }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateRowInArrayAndMap(String formatType) {
+        sql(
+                "CREATE TABLE T "
+                        + "( k INT, v1 ARRAY<ROW(f1 INT, f2 STRING)>, v2 
MAP<INT, ROW(f1 STRING, f2 INT)>, PRIMARY KEY (k) NOT ENFORCED ) "
+                        + "WITH ( 'bucket' = '1', 'file.format' = '"
+                        + formatType
+                        + "' )");
+        sql(
+                "INSERT INTO T VALUES "
+                        + "(1, ARRAY[ROW(100, 'apple'), ROW(101, 'banana')], 
MAP[100, ROW('cat', 1000), 101, ROW('dog', 1001)]), "
+                        + "(2, ARRAY[ROW(200, 'pear'), ROW(201, 'grape')], 
MAP[200, ROW('tiger', 2000), 201, ROW('wolf', 2001)])");
+
+        Map<Integer, Row> map1 = new HashMap<>();
+        map1.put(100, Row.of("cat", 1000));
+        map1.put(101, Row.of("dog", 1001));
+        Map<Integer, Row> map2 = new HashMap<>();
+        map2.put(200, Row.of("tiger", 2000));
+        map2.put(201, Row.of("wolf", 2001));
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, new Row[] {Row.of(100, "apple"), Row.of(101, 
"banana")}, map1),
+                        Row.of(2, new Row[] {Row.of(200, "pear"), Row.of(201, 
"grape")}, map2));
+
+        sql(
+                "ALTER TABLE T MODIFY (v1 ARRAY<ROW(f1 BIGINT, f2 STRING, f3 
STRING)>, v2 MAP<INT, ROW(f3 DOUBLE, f2 INT)>)");
+        sql(
+                "INSERT INTO T VALUES "
+                        + "(1, ARRAY[ROW(1000000000000, 'apple', 'A'), 
ROW(1000000000001, 'banana', 'B')], MAP[100, ROW(1000.0, 1000), 101, 
ROW(1001.0, 1001)]), "
+                        + "(3, ARRAY[ROW(3000000000000, 'mango', 'M'), 
ROW(3000000000001, 'cherry', 'C')], MAP[300, ROW(3000.0, 3000), 301, 
ROW(3001.0, 3001)])");
+
+        map1.clear();
+        map1.put(100, Row.of(1000.0, 1000));
+        map1.put(101, Row.of(1001.0, 1001));
+        map2.clear();
+        map2.put(200, Row.of(null, 2000));
+        map2.put(201, Row.of(null, 2001));
+        Map<Integer, Row> map3 = new HashMap<>();
+        map3.put(300, Row.of(3000.0, 3000));
+        map3.put(301, Row.of(3001.0, 3001));
+        assertThat(sql("SELECT v2, v1, k FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                map1,
+                                new Row[] {
+                                    Row.of(1000000000000L, "apple", "A"),
+                                    Row.of(1000000000001L, "banana", "B")
+                                },
+                                1),
+                        Row.of(
+                                map2,
+                                new Row[] {Row.of(200L, "pear", null), 
Row.of(201L, "grape", null)},
+                                2),
+                        Row.of(
+                                map3,
+                                new Row[] {
+                                    Row.of(3000000000000L, "mango", "M"),
+                                    Row.of(3000000000001L, "cherry", "C")
+                                },
+                                3));
     }
 }


Reply via email to