This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 60f6611e5e [flink] Support updating row type to another row type in 
Flink (#4499)
60f6611e5e is described below

commit 60f6611e5ea07404fb000c94565b9186057ec302
Author: tsreaper <[email protected]>
AuthorDate: Tue Nov 12 22:21:54 2024 +0800

    [flink] Support updating row type to another row type in Flink (#4499)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |   3 +-
 .../org/apache/paimon/schema/SchemaChange.java     |  56 +++++-----
 .../org/apache/paimon/schema/SchemaManager.java    |  22 ++--
 .../apache/paimon/schema/SchemaManagerTest.java    |  20 ++--
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |   8 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 120 ++++++++++++++++++---
 .../apache/paimon/flink/SchemaChangeITCase.java    |  51 ++++++++-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   8 +-
 8 files changed, 213 insertions(+), 75 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c2e4afe5d5..a1cf941cda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -48,6 +48,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -560,7 +561,7 @@ public abstract class AbstractCatalog implements Catalog {
         for (SchemaChange change : changes) {
             if (change instanceof SchemaChange.AddColumn) {
                 SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) 
change;
-                fieldNames.addAll(addColumn.fieldNames());
+                fieldNames.addAll(Arrays.asList(addColumn.fieldNames()));
             } else if (change instanceof SchemaChange.RenameColumn) {
                 SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) 
change;
                 fieldNames.add(rename.newName());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 1c1d601bce..cefa3c6eb9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -25,8 +25,6 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -54,46 +52,45 @@ public interface SchemaChange extends Serializable {
     }
 
     static SchemaChange addColumn(String fieldName, DataType dataType, String 
comment) {
-        return new AddColumn(Collections.singletonList(fieldName), dataType, 
comment, null);
+        return new AddColumn(new String[] {fieldName}, dataType, comment, 
null);
     }
 
     static SchemaChange addColumn(String fieldName, DataType dataType, String 
comment, Move move) {
-        return new AddColumn(Collections.singletonList(fieldName), dataType, 
comment, move);
+        return new AddColumn(new String[] {fieldName}, dataType, comment, 
move);
     }
 
     static SchemaChange addColumn(
-            List<String> fieldNames, DataType dataType, String comment, Move 
move) {
+            String[] fieldNames, DataType dataType, String comment, Move move) 
{
         return new AddColumn(fieldNames, dataType, comment, move);
     }
 
     static SchemaChange renameColumn(String fieldName, String newName) {
-        return new RenameColumn(Collections.singletonList(fieldName), newName);
+        return new RenameColumn(new String[] {fieldName}, newName);
     }
 
-    static SchemaChange renameColumn(List<String> fieldNames, String newName) {
+    static SchemaChange renameColumn(String[] fieldNames, String newName) {
         return new RenameColumn(fieldNames, newName);
     }
 
     static SchemaChange dropColumn(String fieldName) {
-        return new DropColumn(Collections.singletonList(fieldName));
+        return new DropColumn(new String[] {fieldName});
     }
 
-    static SchemaChange dropColumn(List<String> fieldNames) {
+    static SchemaChange dropColumn(String[] fieldNames) {
         return new DropColumn(fieldNames);
     }
 
     static SchemaChange updateColumnType(String fieldName, DataType 
newDataType) {
-        return new UpdateColumnType(Collections.singletonList(fieldName), 
newDataType, false);
+        return new UpdateColumnType(new String[] {fieldName}, newDataType, 
false);
     }
 
     static SchemaChange updateColumnType(
             String fieldName, DataType newDataType, boolean keepNullability) {
-        return new UpdateColumnType(
-                Collections.singletonList(fieldName), newDataType, 
keepNullability);
+        return new UpdateColumnType(new String[] {fieldName}, newDataType, 
keepNullability);
     }
 
     static SchemaChange updateColumnType(
-            List<String> fieldNames, DataType newDataType, boolean 
keepNullability) {
+            String[] fieldNames, DataType newDataType, boolean 
keepNullability) {
         return new UpdateColumnType(fieldNames, newDataType, keepNullability);
     }
 
@@ -228,20 +225,19 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final List<String> fieldNames;
+        private final String[] fieldNames;
         private final DataType dataType;
         private final String description;
         private final Move move;
 
-        private AddColumn(
-                List<String> fieldNames, DataType dataType, String 
description, Move move) {
+        private AddColumn(String[] fieldNames, DataType dataType, String 
description, Move move) {
             this.fieldNames = fieldNames;
             this.dataType = dataType;
             this.description = description;
             this.move = move;
         }
 
-        public List<String> fieldNames() {
+        public String[] fieldNames() {
             return fieldNames;
         }
 
@@ -268,7 +264,7 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             AddColumn addColumn = (AddColumn) o;
-            return Objects.equals(fieldNames, addColumn.fieldNames)
+            return Arrays.equals(fieldNames, addColumn.fieldNames)
                     && dataType.equals(addColumn.dataType)
                     && Objects.equals(description, addColumn.description)
                     && move.equals(addColumn.move);
@@ -288,15 +284,15 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final List<String> fieldNames;
+        private final String[] fieldNames;
         private final String newName;
 
-        private RenameColumn(List<String> fieldNames, String newName) {
+        private RenameColumn(String[] fieldNames, String newName) {
             this.fieldNames = fieldNames;
             this.newName = newName;
         }
 
-        public List<String> fieldNames() {
+        public String[] fieldNames() {
             return fieldNames;
         }
 
@@ -313,7 +309,7 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             RenameColumn that = (RenameColumn) o;
-            return Objects.equals(fieldNames, that.fieldNames)
+            return Arrays.equals(fieldNames, that.fieldNames)
                     && Objects.equals(newName, that.newName);
         }
 
@@ -330,13 +326,13 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final List<String> fieldNames;
+        private final String[] fieldNames;
 
-        private DropColumn(List<String> fieldNames) {
+        private DropColumn(String[] fieldNames) {
             this.fieldNames = fieldNames;
         }
 
-        public List<String> fieldNames() {
+        public String[] fieldNames() {
             return fieldNames;
         }
 
@@ -349,7 +345,7 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             DropColumn that = (DropColumn) o;
-            return Objects.equals(fieldNames, that.fieldNames);
+            return Arrays.equals(fieldNames, that.fieldNames);
         }
 
         @Override
@@ -363,19 +359,19 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final List<String> fieldNames;
+        private final String[] fieldNames;
         private final DataType newDataType;
         // If true, do not change the target field nullability
         private final boolean keepNullability;
 
         private UpdateColumnType(
-                List<String> fieldNames, DataType newDataType, boolean 
keepNullability) {
+                String[] fieldNames, DataType newDataType, boolean 
keepNullability) {
             this.fieldNames = fieldNames;
             this.newDataType = newDataType;
             this.keepNullability = keepNullability;
         }
 
-        public List<String> fieldNames() {
+        public String[] fieldNames() {
             return fieldNames;
         }
 
@@ -396,7 +392,7 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             UpdateColumnType that = (UpdateColumnType) o;
-            return Objects.equals(fieldNames, that.fieldNames)
+            return Arrays.equals(fieldNames, that.fieldNames)
                     && newDataType.equals(that.newDataType);
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 86ed96d5b0..28cc69cf99 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -290,7 +290,7 @@ public class SchemaManager implements Serializable {
                     DataType dataType =
                             ReassignFieldId.reassign(addColumn.dataType(), 
highestFieldId);
 
-                    new 
NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) {
+                    new NestedColumnModifier(addColumn.fieldNames()) {
                         @Override
                         protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
                                 throws Catalog.ColumnAlreadyExistException {
@@ -320,7 +320,7 @@ public class SchemaManager implements Serializable {
                 } else if (change instanceof RenameColumn) {
                     RenameColumn rename = (RenameColumn) change;
                     assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
-                    new NestedColumnModifier(rename.fieldNames().toArray(new 
String[0])) {
+                    new NestedColumnModifier(rename.fieldNames()) {
                         @Override
                         protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
                                 throws Catalog.ColumnNotExistException,
@@ -347,7 +347,7 @@ public class SchemaManager implements Serializable {
                 } else if (change instanceof DropColumn) {
                     DropColumn drop = (DropColumn) change;
                     dropColumnValidation(oldTableSchema, drop);
-                    new NestedColumnModifier(drop.fieldNames().toArray(new 
String[0])) {
+                    new NestedColumnModifier(drop.fieldNames()) {
                         @Override
                         protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
                                 throws Catalog.ColumnNotExistException {
@@ -364,7 +364,7 @@ public class SchemaManager implements Serializable {
                     assertNotUpdatingPrimaryKeys(oldTableSchema, 
update.fieldNames(), "update");
                     updateNestedColumn(
                             newFields,
-                            update.fieldNames().toArray(new String[0]),
+                            update.fieldNames(),
                             (field) -> {
                                 DataType targetType = update.newDataType();
                                 if (update.keepNullability()) {
@@ -558,8 +558,8 @@ public class SchemaManager implements Serializable {
 
         Map<String, String> columnNames = Maps.newHashMap();
         for (RenameColumn renameColumn : renames) {
-            if (renameColumn.fieldNames().size() == 1) {
-                columnNames.put(renameColumn.fieldNames().get(0), 
renameColumn.newName());
+            if (renameColumn.fieldNames().length == 1) {
+                columnNames.put(renameColumn.fieldNames()[0], 
renameColumn.newName());
             }
         }
 
@@ -571,10 +571,10 @@ public class SchemaManager implements Serializable {
 
     private static void dropColumnValidation(TableSchema schema, DropColumn 
change) {
         // primary keys and partition keys can't be nested columns
-        if (change.fieldNames().size() > 1) {
+        if (change.fieldNames().length > 1) {
             return;
         }
-        String columnToDrop = change.fieldNames().get(0);
+        String columnToDrop = change.fieldNames()[0];
         if (schema.partitionKeys().contains(columnToDrop)
                 || schema.primaryKeys().contains(columnToDrop)) {
             throw new UnsupportedOperationException(
@@ -583,12 +583,12 @@ public class SchemaManager implements Serializable {
     }
 
     private static void assertNotUpdatingPrimaryKeys(
-            TableSchema schema, List<String> fieldNames, String operation) {
+            TableSchema schema, String[] fieldNames, String operation) {
         // partition keys can't be nested columns
-        if (fieldNames.size() > 1) {
+        if (fieldNames.length > 1) {
             return;
         }
-        String columnToRename = fieldNames.get(0);
+        String columnToRename = fieldNames[0];
         if (schema.partitionKeys().contains(columnToRename)) {
             throw new UnsupportedOperationException(
                     String.format(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index ac8d4cd91e..088cb72f92 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -555,7 +555,7 @@ public class SchemaManagerTest {
 
         SchemaChange addColumn =
                 SchemaChange.addColumn(
-                        Arrays.asList("v", "f2", "f3"),
+                        new String[] {"v", "f2", "f3"},
                         DataTypes.STRING(),
                         "",
                         SchemaChange.Move.after("f3", "f1"));
@@ -579,11 +579,11 @@ public class SchemaManagerTest {
                 .hasMessageContaining("Column v.f2.f3 already exists");
         SchemaChange middleColumnNotExistAddColumn =
                 SchemaChange.addColumn(
-                        Arrays.asList("v", "invalid", "f4"), 
DataTypes.STRING(), "", null);
+                        new String[] {"v", "invalid", "f4"}, 
DataTypes.STRING(), "", null);
         assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistAddColumn))
                 .hasMessageContaining("Column v.invalid does not exist");
 
-        SchemaChange dropColumn = SchemaChange.dropColumn(Arrays.asList("v", 
"f2", "f1"));
+        SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v", 
"f2", "f1"});
         manager.commitChanges(dropColumn);
 
         innerType =
@@ -602,7 +602,7 @@ public class SchemaManagerTest {
         assertThatCode(() -> manager.commitChanges(dropColumn))
                 .hasMessageContaining("Column v.f2.f1 does not exist");
         SchemaChange middleColumnNotExistDropColumn =
-                SchemaChange.dropColumn(Arrays.asList("v", "invalid", "f2"));
+                SchemaChange.dropColumn(new String[] {"v", "invalid", "f2"});
         assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistDropColumn))
                 .hasMessageContaining("Column v.invalid does not exist");
     }
@@ -632,7 +632,7 @@ public class SchemaManagerTest {
         manager.createTable(schema);
 
         SchemaChange renameColumn =
-                SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"), 
"f100");
+                SchemaChange.renameColumn(new String[] {"v", "f2", "f1"}, 
"f100");
         manager.commitChanges(renameColumn);
 
         innerType =
@@ -649,17 +649,17 @@ public class SchemaManagerTest {
         
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
 
         SchemaChange middleColumnNotExistRenameColumn =
-                SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"), 
"f200");
+                SchemaChange.renameColumn(new String[] {"v", "invalid", "f2"}, 
"f200");
         assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistRenameColumn))
                 .hasMessageContaining("Column v.invalid does not exist");
 
         SchemaChange lastColumnNotExistRenameColumn =
-                SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"), 
"new_invalid");
+                SchemaChange.renameColumn(new String[] {"v", "f2", "invalid"}, 
"new_invalid");
         assertThatCode(() -> 
manager.commitChanges(lastColumnNotExistRenameColumn))
                 .hasMessageContaining("Column v.f2.invalid does not exist");
 
         SchemaChange newNameAlreadyExistRenameColumn =
-                SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"), 
"f100");
+                SchemaChange.renameColumn(new String[] {"v", "f2", "f2"}, 
"f100");
         assertThatCode(() -> 
manager.commitChanges(newNameAlreadyExistRenameColumn))
                 .hasMessageContaining("Column v.f2.f100 already exists");
     }
@@ -690,7 +690,7 @@ public class SchemaManagerTest {
 
         SchemaChange updateColumnType =
                 SchemaChange.updateColumnType(
-                        Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(), 
true);
+                        new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(), 
true);
         manager.commitChanges(updateColumnType);
 
         innerType =
@@ -708,7 +708,7 @@ public class SchemaManagerTest {
 
         SchemaChange middleColumnNotExistUpdateColumnType =
                 SchemaChange.updateColumnType(
-                        Arrays.asList("v", "invalid", "f1"), 
DataTypes.BIGINT(), true);
+                        new String[] {"v", "invalid", "f1"}, 
DataTypes.BIGINT(), true);
         assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistUpdateColumnType))
                 .hasMessageContaining("Column v.invalid does not exist");
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 0e93fdb073..c2e928bd4a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -101,7 +101,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
             SchemaChange.UpdateColumnType updateColumnType =
                     (SchemaChange.UpdateColumnType) schemaChange;
             Preconditions.checkState(
-                    updateColumnType.fieldNames().size() == 1,
+                    updateColumnType.fieldNames().length == 1,
                     "Paimon CDC currently does not support nested type schema 
evolution.");
             TableSchema schema =
                     schemaManager
@@ -110,11 +110,11 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                                     () ->
                                             new RuntimeException(
                                                     "Table does not exist. 
This is unexpected."));
-            int idx = 
schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
+            int idx = 
schema.fieldNames().indexOf(updateColumnType.fieldNames()[0]);
             Preconditions.checkState(
                     idx >= 0,
                     "Field name "
-                            + updateColumnType.fieldNames().get(0)
+                            + updateColumnType.fieldNames()[0]
                             + " does not exist in table. This is unexpected.");
             DataType oldType = schema.fields().get(idx).type();
             DataType newType = updateColumnType.newDataType();
@@ -126,7 +126,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                     throw new UnsupportedOperationException(
                             String.format(
                                     "Cannot convert field %s from type %s to 
%s of Paimon table %s.",
-                                    updateColumnType.fieldNames().get(0),
+                                    updateColumnType.fieldNames()[0],
                                     oldType,
                                     newType,
                                     identifier.getFullName()));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index cae6e6f0e3..ae30fa569d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -37,6 +37,8 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
@@ -98,7 +100,6 @@ import 
org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.procedures.Procedure;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,11 +111,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -595,17 +598,12 @@ public class FlinkCatalog extends AbstractCatalog {
             if (!oldTableNonPhysicalColumnIndex.containsKey(
                     ((ModifyPhysicalColumnType) 
change).getOldColumn().getName())) {
                 ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) 
change;
-                LogicalType newColumnType = 
modify.getNewType().getLogicalType();
-                LogicalType oldColumnType = 
modify.getOldColumn().getDataType().getLogicalType();
-                if (newColumnType.isNullable() != oldColumnType.isNullable()) {
-                    schemaChanges.add(
-                            SchemaChange.updateColumnNullability(
-                                    modify.getNewColumn().getName(), 
newColumnType.isNullable()));
-                }
-                schemaChanges.add(
-                        SchemaChange.updateColumnType(
-                                modify.getOldColumn().getName(),
-                                
LogicalTypeConversion.toDataType(newColumnType)));
+                generateNestedColumnUpdates(
+                        
Collections.singletonList(modify.getOldColumn().getName()),
+                        LogicalTypeConversion.toDataType(
+                                
modify.getOldColumn().getDataType().getLogicalType()),
+                        
LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()),
+                        schemaChanges);
             }
             return schemaChanges;
         } else if (change instanceof ModifyColumnPosition) {
@@ -670,6 +668,104 @@ public class FlinkCatalog extends AbstractCatalog {
         throw new UnsupportedOperationException("Change is not supported: " + 
change.getClass());
     }
 
+    private void generateNestedColumnUpdates(
+            List<String> fieldNames,
+            org.apache.paimon.types.DataType oldType,
+            org.apache.paimon.types.DataType newType,
+            List<SchemaChange> schemaChanges) {
+        if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
+            Preconditions.checkArgument(
+                    newType.getTypeRoot() == DataTypeRoot.ROW,
+                    "Column "
+                            + String.join(".", fieldNames)
+                            + " can only be updated to row type, and cannot be 
updated to "
+                            + newType
+                            + " type");
+            org.apache.paimon.types.RowType oldRowType = 
(org.apache.paimon.types.RowType) oldType;
+            org.apache.paimon.types.RowType newRowType = 
(org.apache.paimon.types.RowType) newType;
+
+            // check that existing fields have same order
+            Map<String, Integer> oldFieldOrders = new HashMap<>();
+            for (int i = 0; i < oldRowType.getFieldCount(); i++) {
+                oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
+            }
+            int lastIdx = -1;
+            String lastFieldName = "";
+            for (DataField newField : newRowType.getFields()) {
+                String name = newField.name();
+                if (oldFieldOrders.containsKey(name)) {
+                    int idx = oldFieldOrders.get(name);
+                    Preconditions.checkState(
+                            lastIdx < idx,
+                            "Order of existing fields in column %s must be 
kept the same. "
+                                    + "However, field %s and %s have changed 
their orders.",
+                            String.join(".", fieldNames),
+                            lastFieldName,
+                            name);
+                    lastIdx = idx;
+                    lastFieldName = name;
+                }
+            }
+
+            // drop fields
+            Set<String> newFieldNames = new 
HashSet<>(newRowType.getFieldNames());
+            for (String name : oldRowType.getFieldNames()) {
+                if (!newFieldNames.contains(name)) {
+                    List<String> dropColumnNames = new ArrayList<>(fieldNames);
+                    dropColumnNames.add(name);
+                    schemaChanges.add(
+                            
SchemaChange.dropColumn(dropColumnNames.toArray(new String[0])));
+                }
+            }
+
+            for (int i = 0; i < newRowType.getFieldCount(); i++) {
+                DataField field = newRowType.getFields().get(i);
+                String name = field.name();
+                List<String> fullFieldNames = new ArrayList<>(fieldNames);
+                fullFieldNames.add(name);
+                if (!oldFieldOrders.containsKey(name)) {
+                    // add fields
+                    SchemaChange.Move move;
+                    if (i == 0) {
+                        move = SchemaChange.Move.first(name);
+                    } else {
+                        String lastName = newRowType.getFields().get(i - 
1).name();
+                        move = SchemaChange.Move.after(name, lastName);
+                    }
+                    schemaChanges.add(
+                            SchemaChange.addColumn(
+                                    fullFieldNames.toArray(new String[0]),
+                                    field.type(),
+                                    field.description(),
+                                    move));
+                } else {
+                    // update existing fields
+                    DataField oldField = 
oldRowType.getFields().get(oldFieldOrders.get(name));
+                    if (!Objects.equals(oldField.description(), 
field.description())) {
+                        schemaChanges.add(
+                                SchemaChange.updateColumnComment(
+                                        fullFieldNames.toArray(new String[0]),
+                                        field.description()));
+                    }
+                    generateNestedColumnUpdates(
+                            fullFieldNames, oldField.type(), field.type(), 
schemaChanges);
+                }
+            }
+        } else {
+            if (!oldType.equalsIgnoreNullable(newType)) {
+                schemaChanges.add(
+                        SchemaChange.updateColumnType(
+                                fieldNames.toArray(new String[0]), newType, 
false));
+            }
+        }
+
+        if (oldType.isNullable() != newType.isNullable()) {
+            schemaChanges.add(
+                    SchemaChange.updateColumnNullability(
+                            fieldNames.toArray(new String[0]), 
newType.isNullable()));
+        }
+    }
+
     /**
      * Try handle change related to materialized table.
      *
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 08f79efccb..ba161fe840 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.format.DateTimeFormatter;
 import java.util.List;
@@ -35,6 +37,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for schema changes. */
@@ -1015,7 +1018,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase 
{
         sql("INSERT INTO T1 VALUES ('a', 'b', 'l')");
         sql("INSERT INTO T1 VALUES ('a', 'd', 'n')");
         sql("INSERT INTO T1 VALUES ('a', 'e', 'm')");
-        List<Row> sql = sql("select * from T1");
         assertThat(sql("select * from T1").toString()).isEqualTo("[+I[a, d, 
n]]");
 
         // test for get small record
@@ -1024,7 +1026,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase 
{
         sql("INSERT INTO T2 VALUES ('a', 'b', 1)");
         sql("INSERT INTO T2 VALUES ('a', 'd', 3)");
         sql("INSERT INTO T2 VALUES ('a', 'e', 2)");
-        sql = sql("select * from T2");
         assertThat(sql("select * from T2").toString()).isEqualTo("[+I[a, b, 
1]]");
 
         // test for get largest record
@@ -1033,7 +1034,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase 
{
         sql("INSERT INTO T3 VALUES ('a', 'b', 1.0)");
         sql("INSERT INTO T3 VALUES ('a', 'd', 3.0)");
         sql("INSERT INTO T3 VALUES ('a', 'e', 2.0)");
-        sql = sql("select * from T3");
         assertThat(sql("select * from T3").toString()).isEqualTo("[+I[a, d, 
3.0]]");
     }
 
@@ -1089,4 +1089,49 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
                                 UnsupportedOperationException.class,
                                 "Cannot change bucket to -1."));
     }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNestedColumn(String formatType) {
+        sql(
+                "CREATE TABLE T "
+                        + "( k INT, v ROW(f1 INT, f2 ROW(f1 STRING, f2 INT NOT 
NULL)), PRIMARY KEY (k) NOT ENFORCED ) "
+                        + "WITH ( 'bucket' = '1', 'file.format' = '"
+                        + formatType
+                        + "' )");
+        sql(
+                "INSERT INTO T VALUES (1, ROW(10, ROW('apple', 100))), (2, 
ROW(20, ROW('banana', 200)))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, Row.of(10, Row.of("apple", 100))),
+                        Row.of(2, Row.of(20, Row.of("banana", 200))));
+
+        sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 ROW(f3 DOUBLE, f2 INT), 
f3 STRING))");
+        sql(
+                "INSERT INTO T VALUES "
+                        + "(1, ROW(1000000000001, ROW(101.0, 101), 'cat')), "
+                        + "(3, ROW(3000000000001, ROW(301.0, CAST(NULL AS 
INT)), 'dog'))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, Row.of(1000000000001L, Row.of(101.0, 101), 
"cat")),
+                        Row.of(2, Row.of(20L, Row.of(null, 200), null)),
+                        Row.of(3, Row.of(3000000000001L, Row.of(301.0, null), 
"dog")));
+
+        sql(
+                "ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 ROW(f3 DOUBLE, f1 
STRING, f2 INT), f3 STRING))");
+        sql(
+                "INSERT INTO T VALUES "
+                        + "(1, ROW(1000000000002, ROW(102.0, 'APPLE', 102), 
'cat')), "
+                        + "(4, ROW(4000000000002, ROW(402.0, 'LEMON', 402), 
'tiger'))");
+        assertThat(sql("SELECT k, v.f2.f1, v.f3 FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "APPLE", "cat"),
+                        Row.of(2, null, null),
+                        Row.of(3, null, "dog"),
+                        Row.of(4, "LEMON", "tiger"));
+
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 
INT, f3 STRING))"))
+                .hasRootCauseMessage(
+                        "Column v.f2 can only be updated to row type, and 
cannot be updated to INT type");
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 5fde2c5659..89448c1f43 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -373,20 +373,20 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
             TableChange.AddColumn add = (TableChange.AddColumn) change;
             SchemaChange.Move move = getMove(add.position(), add.fieldNames());
             return SchemaChange.addColumn(
-                    Arrays.asList(add.fieldNames()),
+                    add.fieldNames(),
                     toPaimonType(add.dataType()).copy(add.isNullable()),
                     add.comment(),
                     move);
         } else if (change instanceof TableChange.RenameColumn) {
             TableChange.RenameColumn rename = (TableChange.RenameColumn) 
change;
-            return 
SchemaChange.renameColumn(Arrays.asList(rename.fieldNames()), rename.newName());
+            return SchemaChange.renameColumn(rename.fieldNames(), 
rename.newName());
         } else if (change instanceof TableChange.DeleteColumn) {
             TableChange.DeleteColumn delete = (TableChange.DeleteColumn) 
change;
-            return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
+            return SchemaChange.dropColumn(delete.fieldNames());
         } else if (change instanceof TableChange.UpdateColumnType) {
             TableChange.UpdateColumnType update = 
(TableChange.UpdateColumnType) change;
             return SchemaChange.updateColumnType(
-                    Arrays.asList(update.fieldNames()), 
toPaimonType(update.newDataType()), true);
+                    update.fieldNames(), toPaimonType(update.newDataType()), 
true);
         } else if (change instanceof TableChange.UpdateColumnNullability) {
             TableChange.UpdateColumnNullability update =
                     (TableChange.UpdateColumnNullability) change;


Reply via email to