This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ee8b1b198 [core][spark] Support updating nested column types in Spark 
(#4494)
ee8b1b198 is described below

commit ee8b1b198d3c65d68df15cda59dc0a0c31f63888
Author: tsreaper <[email protected]>
AuthorDate: Mon Nov 11 20:32:57 2024 +0800

    [core][spark] Support updating nested column types in Spark (#4494)
---
 .../org/apache/paimon/schema/SchemaChange.java     | 25 +++++++----
 .../org/apache/paimon/schema/SchemaManager.java    | 28 ++++---------
 .../org/apache/paimon/catalog/CatalogTestBase.java |  4 +-
 .../apache/paimon/schema/SchemaManagerTest.java    | 49 ++++++++++++++++++++++
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |  9 ++--
 .../java/org/apache/paimon/spark/SparkCatalog.java | 10 +----
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 33 +++++++++++++++
 7 files changed, 116 insertions(+), 42 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 7e94b0a77..1c1d601bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -83,12 +83,18 @@ public interface SchemaChange extends Serializable {
     }
 
     static SchemaChange updateColumnType(String fieldName, DataType 
newDataType) {
-        return new UpdateColumnType(fieldName, newDataType, false);
+        return new UpdateColumnType(Collections.singletonList(fieldName), 
newDataType, false);
     }
 
     static SchemaChange updateColumnType(
             String fieldName, DataType newDataType, boolean keepNullability) {
-        return new UpdateColumnType(fieldName, newDataType, keepNullability);
+        return new UpdateColumnType(
+                Collections.singletonList(fieldName), newDataType, 
keepNullability);
+    }
+
+    static SchemaChange updateColumnType(
+            List<String> fieldNames, DataType newDataType, boolean 
keepNullability) {
+        return new UpdateColumnType(fieldNames, newDataType, keepNullability);
     }
 
     static SchemaChange updateColumnNullability(String fieldName, boolean 
newNullability) {
@@ -357,19 +363,20 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final String fieldName;
+        private final List<String> fieldNames;
         private final DataType newDataType;
         // If true, do not change the target field nullability
         private final boolean keepNullability;
 
-        private UpdateColumnType(String fieldName, DataType newDataType, 
boolean keepNullability) {
-            this.fieldName = fieldName;
+        private UpdateColumnType(
+                List<String> fieldNames, DataType newDataType, boolean 
keepNullability) {
+            this.fieldNames = fieldNames;
             this.newDataType = newDataType;
             this.keepNullability = keepNullability;
         }
 
-        public String fieldName() {
-            return fieldName;
+        public List<String> fieldNames() {
+            return fieldNames;
         }
 
         public DataType newDataType() {
@@ -389,14 +396,14 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             UpdateColumnType that = (UpdateColumnType) o;
-            return Objects.equals(fieldName, that.fieldName)
+            return Objects.equals(fieldNames, that.fieldNames)
                     && newDataType.equals(that.newDataType);
         }
 
         @Override
         public int hashCode() {
             int result = Objects.hash(newDataType);
-            result = 31 * result + Objects.hashCode(fieldName);
+            result = 31 * result + Objects.hashCode(fieldNames);
             return result;
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 5ffeca65d..86ed96d5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -319,7 +319,7 @@ public class SchemaManager implements Serializable {
                     }.updateIntermediateColumn(newFields, 0);
                 } else if (change instanceof RenameColumn) {
                     RenameColumn rename = (RenameColumn) change;
-                    renameColumnValidation(oldTableSchema, rename);
+                    assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
                     new NestedColumnModifier(rename.fieldNames().toArray(new 
String[0])) {
                         @Override
                         protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
@@ -361,15 +361,10 @@ public class SchemaManager implements Serializable {
                     }.updateIntermediateColumn(newFields, 0);
                 } else if (change instanceof UpdateColumnType) {
                     UpdateColumnType update = (UpdateColumnType) change;
-                    if 
(oldTableSchema.partitionKeys().contains(update.fieldName())) {
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "Cannot update partition column [%s] 
type in the table[%s].",
-                                        update.fieldName(), 
tableRoot.getName()));
-                    }
+                    assertNotUpdatingPrimaryKeys(oldTableSchema, 
update.fieldNames(), "update");
                     updateNestedColumn(
                             newFields,
-                            new String[] {update.fieldName()},
+                            update.fieldNames().toArray(new String[0]),
                             (field) -> {
                                 DataType targetType = update.newDataType();
                                 if (update.keepNullability()) {
@@ -382,13 +377,6 @@ public class SchemaManager implements Serializable {
                                         String.format(
                                                 "Column type %s[%s] cannot be 
converted to %s without loosing information.",
                                                 field.name(), field.type(), 
targetType));
-                                AtomicInteger dummyId = new AtomicInteger(0);
-                                if (dummyId.get() != 0) {
-                                    throw new RuntimeException(
-                                            String.format(
-                                                    "Update column to nested 
row type '%s' is not supported.",
-                                                    targetType));
-                                }
                                 return new DataField(
                                         field.id(), field.name(), targetType, 
field.description());
                             });
@@ -594,15 +582,17 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    private static void renameColumnValidation(TableSchema schema, 
RenameColumn change) {
+    private static void assertNotUpdatingPrimaryKeys(
+            TableSchema schema, List<String> fieldNames, String operation) {
         // partition keys can't be nested columns
-        if (change.fieldNames().size() > 1) {
+        if (fieldNames.size() > 1) {
             return;
         }
-        String columnToRename = change.fieldNames().get(0);
+        String columnToRename = fieldNames.get(0);
         if (schema.partitionKeys().contains(columnToRename)) {
             throw new UnsupportedOperationException(
-                    String.format("Cannot rename partition column: [%s]", 
columnToRename));
+                    String.format(
+                            "Cannot " + operation + " partition column: [%s]", 
columnToRename));
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 3bba6d562..f130920a7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -675,8 +675,8 @@ public abstract class CatalogTestBase {
                                         false))
                 .satisfies(
                         anyCauseMatches(
-                                IllegalArgumentException.class,
-                                "Cannot update partition column [dt] type in 
the table"));
+                                UnsupportedOperationException.class,
+                                "Cannot update partition column: [dt]"));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 5fb76387e..ac8d4cd91 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -663,4 +663,53 @@ public class SchemaManagerTest {
         assertThatCode(() -> 
manager.commitChanges(newNameAlreadyExistRenameColumn))
                 .hasMessageContaining("Column v.f2.f100 already exists");
     }
+
+    @Test
+    public void testUpdateNestedColumnType() throws Exception {
+        RowType innerType =
+                RowType.of(
+                        new DataField(4, "f1", DataTypes.INT()),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        RowType middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        RowType outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+
+        Schema schema =
+                new Schema(
+                        outerType.getFields(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+        manager.createTable(schema);
+
+        SchemaChange updateColumnType =
+                SchemaChange.updateColumnType(
+                        Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(), 
true);
+        manager.commitChanges(updateColumnType);
+
+        innerType =
+                RowType.of(
+                        new DataField(4, "f1", DataTypes.BIGINT()),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+        
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+        SchemaChange middleColumnNotExistUpdateColumnType =
+                SchemaChange.updateColumnType(
+                        Arrays.asList("v", "invalid", "f1"), 
DataTypes.BIGINT(), true);
+        assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistUpdateColumnType))
+                .hasMessageContaining("Column v.invalid does not exist");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 77c49e8f3..0e93fdb07 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -100,6 +100,9 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
             SchemaChange.UpdateColumnType updateColumnType =
                     (SchemaChange.UpdateColumnType) schemaChange;
+            Preconditions.checkState(
+                    updateColumnType.fieldNames().size() == 1,
+                    "Paimon CDC currently does not support nested type schema 
evolution.");
             TableSchema schema =
                     schemaManager
                             .latest()
@@ -107,11 +110,11 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                                     () ->
                                             new RuntimeException(
                                                     "Table does not exist. 
This is unexpected."));
-            int idx = 
schema.fieldNames().indexOf(updateColumnType.fieldName());
+            int idx = 
schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
             Preconditions.checkState(
                     idx >= 0,
                     "Field name "
-                            + updateColumnType.fieldName()
+                            + updateColumnType.fieldNames().get(0)
                             + " does not exist in table. This is unexpected.");
             DataType oldType = schema.fields().get(idx).type();
             DataType newType = updateColumnType.newDataType();
@@ -123,7 +126,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                     throw new UnsupportedOperationException(
                             String.format(
                                     "Cannot convert field %s from type %s to 
%s of Paimon table %s.",
-                                    updateColumnType.fieldName(),
+                                    updateColumnType.fieldNames().get(0),
                                     oldType,
                                     newType,
                                     identifier.getFullName()));
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 82c8939ea..5fde2c565 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -385,9 +385,8 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
             return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
         } else if (change instanceof TableChange.UpdateColumnType) {
             TableChange.UpdateColumnType update = 
(TableChange.UpdateColumnType) change;
-            validateAlterNestedField(update.fieldNames());
             return SchemaChange.updateColumnType(
-                    update.fieldNames()[0], 
toPaimonType(update.newDataType()), true);
+                    Arrays.asList(update.fieldNames()), 
toPaimonType(update.newDataType()), true);
         } else if (change instanceof TableChange.UpdateColumnNullability) {
             TableChange.UpdateColumnNullability update =
                     (TableChange.UpdateColumnNullability) change;
@@ -449,13 +448,6 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         return schemaBuilder.build();
     }
 
-    private void validateAlterNestedField(String[] fieldNames) {
-        if (fieldNames.length > 1) {
-            throw new UnsupportedOperationException(
-                    "Alter nested column is not supported: " + 
Arrays.toString(fieldNames));
-        }
-    }
-
     private void validateAlterProperty(String alterKey) {
         if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
             throw new UnsupportedOperationException("Alter primary key is not 
supported");
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index ccae59e88..771ddc628 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -817,4 +817,37 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                                 .map(Row::toString))
                 .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
     }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNestedColumnType(String formatType) {
+        String tableName = "testRenameNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: 
STRING, f2: INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, 
STRUCT(20, STRUCT('banana', 200)))");
+        assertThat(
+                        spark.sql("SELECT v.f2.f2, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[100,1]", "[200,2]");
+
+        spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN 
v.f2.f2 f2 BIGINT");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101))), (3, 
STRUCT(31, STRUCT('CHERRY', 3000000000000)))");
+        assertThat(
+                        spark.sql("SELECT v.f2.f2, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[101,1]", "[200,2]", 
"[3000000000000,3]");
+    }
 }

Reply via email to