This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 77b7d8d4dc [spark] Support changing column types in array<struct> or 
map<?,struct> (#4618)
77b7d8d4dc is described below

commit 77b7d8d4dc11cdb5b4b86712faa1e8d37e30af86
Author: tsreaper <[email protected]>
AuthorDate: Mon Dec 2 15:41:03 2024 +0800

    [spark] Support changing column types in array<struct> or map<?,struct> 
(#4618)
---
 .../org/apache/paimon/schema/SchemaManager.java    |  23 ++-
 .../apache/paimon/schema/SchemaManagerTest.java    |   8 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  10 +-
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 224 +++++++++++++++++++++
 4 files changed, 253 insertions(+), 12 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index d827ffd0fb..83ddbccfef 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -639,9 +639,10 @@ public class SchemaManager implements Serializable {
 
                 String fullFieldName =
                         String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1));
-                List<DataField> nestedFields =
-                        new ArrayList<>(extractRowType(field.type(), 
fullFieldName).getFields());
-                updateIntermediateColumn(nestedFields, depth + 1);
+                List<DataField> nestedFields = new ArrayList<>();
+                int newDepth =
+                        depth + extractRowDataFields(field.type(), 
fullFieldName, nestedFields);
+                updateIntermediateColumn(nestedFields, newDepth);
                 newFields.set(
                         i,
                         new DataField(
@@ -657,14 +658,22 @@ public class SchemaManager implements Serializable {
                     String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
         }
 
-        private RowType extractRowType(DataType type, String fullFieldName) {
+        private int extractRowDataFields(
+                DataType type, String fullFieldName, List<DataField> 
nestedFields) {
             switch (type.getTypeRoot()) {
                 case ROW:
-                    return (RowType) type;
+                    nestedFields.addAll(((RowType) type).getFields());
+                    return 1;
                 case ARRAY:
-                    return extractRowType(((ArrayType) type).getElementType(), 
fullFieldName);
+                    return extractRowDataFields(
+                                    ((ArrayType) type).getElementType(),
+                                    fullFieldName,
+                                    nestedFields)
+                            + 1;
                 case MAP:
-                    return extractRowType(((MapType) type).getValueType(), 
fullFieldName);
+                    return extractRowDataFields(
+                                    ((MapType) type).getValueType(), 
fullFieldName, nestedFields)
+                            + 1;
                 default:
                     throw new IllegalArgumentException(
                             fullFieldName + " is not a structured type.");
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index f0d6543699..c8b102b358 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -738,13 +738,15 @@ public class SchemaManagerTest {
 
         SchemaChange addColumn =
                 SchemaChange.addColumn(
-                        new String[] {"v", "f3"},
+                        new String[] {"v", "element", "value", "f3"},
                         DataTypes.STRING(),
                         null,
                         SchemaChange.Move.first("f3"));
-        SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v", 
"f2"});
+        SchemaChange dropColumn =
+                SchemaChange.dropColumn(new String[] {"v", "element", "value", 
"f2"});
         SchemaChange updateColumnType =
-                SchemaChange.updateColumnType(new String[] {"v", "f1"}, 
DataTypes.BIGINT(), false);
+                SchemaChange.updateColumnType(
+                        new String[] {"v", "element", "value", "f1"}, 
DataTypes.BIGINT(), false);
         manager.commitChanges(addColumn, dropColumn, updateColumnType);
 
         innerType =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 09fc0328ef..c67e79c1c0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -756,8 +756,11 @@ public class FlinkCatalog extends AbstractCatalog {
                     "Column %s can only be updated to array type, and cannot 
be updated to %s type",
                     joinedNames,
                     newType);
+            List<String> fullFieldNames = new ArrayList<>(fieldNames);
+            // add a dummy column name indicating the element of array
+            fullFieldNames.add("element");
             generateNestedColumnUpdates(
-                    fieldNames,
+                    fullFieldNames,
                     ((org.apache.paimon.types.ArrayType) 
oldType).getElementType(),
                     ((org.apache.paimon.types.ArrayType) 
newType).getElementType(),
                     schemaChanges);
@@ -775,8 +778,11 @@ public class FlinkCatalog extends AbstractCatalog {
                     joinedNames,
                     oldMapType.getKeyType(),
                     newMapType.getKeyType());
+            List<String> fullFieldNames = new ArrayList<>(fieldNames);
+            // add a dummy column name indicating the value of map
+            fullFieldNames.add("value");
             generateNestedColumnUpdates(
-                    fieldNames,
+                    fullFieldNames,
                     oldMapType.getValueType(),
                     newMapType.getValueType(),
                     schemaChanges);
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 771ddc6287..fb4dab38ed 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -789,6 +789,89 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         "[5,[53,[503,500.03,5003],five]]");
     }
 
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testAddAndDropNestedColumnInArray(String formatType) {
+        String tableName = "testAddNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: 
INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, ARRAY(STRUCT('apple', 100), 
STRUCT('banana', 101))), "
+                        + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 
201)))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,WrappedArray([apple,100], [banana,101])]",
+                        "[2,WrappedArray([cat,200], [dog,201])]");
+
+        spark.sql(
+                "ALTER TABLE paimon.default."
+                        + tableName
+                        + " ADD COLUMN v.element.f3 STRING AFTER f2");
+        spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN 
v.element.f1");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111, 
'BANANA'))), "
+                        + "(3, ARRAY(STRUCT(310, 'FLOWER')))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,WrappedArray([110,APPLE], [111,BANANA])]",
+                        "[2,WrappedArray([200,null], [201,null])]",
+                        "[3,WrappedArray([310,FLOWER])]");
+    }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testAddAndDropNestedColumnInMap(String formatType) {
+        String tableName = "testAddNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: 
INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, MAP(10, STRUCT('apple', 100), 20, 
STRUCT('banana', 101))), "
+                        + "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 
201)))");
+        assertThat(
+                        spark.sql("SELECT k, v[10].f1, v[10].f2 FROM 
paimon.default." + tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[1,apple,100]", "[2,cat,200]");
+
+        spark.sql(
+                "ALTER TABLE paimon.default."
+                        + tableName
+                        + " ADD COLUMN v.value.f3 STRING AFTER f2");
+        spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN 
v.value.f1");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, MAP(10, STRUCT(110, 'APPLE'), 20, 
STRUCT(111, 'BANANA'))), "
+                        + "(3, MAP(10, STRUCT(310, 'FLOWER')))");
+        assertThat(
+                        spark.sql("SELECT k, v[10].f2, v[10].f3 FROM 
paimon.default." + tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[1,110,APPLE]", "[2,200,null]", 
"[3,310,FLOWER]");
+    }
+
     @ParameterizedTest()
     @ValueSource(strings = {"orc", "avro", "parquet"})
     public void testRenameNestedColumn(String formatType) {
@@ -818,6 +901,67 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                 .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
     }
 
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testRenameNestedColumnInArray(String formatType) {
+        String tableName = "testRenameNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: 
INT>>) "
+                        + "TBLPROPERTIES ('file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, ARRAY(STRUCT('apple', 100), 
STRUCT('banana', 101))), "
+                        + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 
201)))");
+        assertThat(
+                        spark.sql("SELECT v[0].f1, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+
+        spark.sql(
+                "ALTER TABLE paimon.default." + tableName + " RENAME COLUMN 
v.element.f1 to f100");
+        assertThat(
+                        spark.sql("SELECT v[0].f100, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+    }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testRenameNestedColumnInMap(String formatType) {
+        String tableName = "testRenameNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: 
INT>>) "
+                        + "TBLPROPERTIES ('file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, MAP(10, STRUCT('apple', 100), 20, 
STRUCT('banana', 101))), "
+                        + "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 
201)))");
+        assertThat(
+                        spark.sql("SELECT v[10].f1, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+
+        spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN 
v.value.f1 to f100");
+        assertThat(
+                        spark.sql("SELECT v[10].f100, k FROM paimon.default." 
+ tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[cat,2]");
+    }
+
     @ParameterizedTest()
     @ValueSource(strings = {"orc", "avro", "parquet"})
     public void testUpdateNestedColumnType(String formatType) {
@@ -850,4 +994,84 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                                 .map(Row::toString))
                 .containsExactlyInAnyOrder("[101,1]", "[200,2]", 
"[3000000000000,3]");
     }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNestedColumnTypeInArray(String formatType) {
+        String tableName = "testRenameNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v ARRAY<STRUCT<f1: STRING, f2: 
INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, ARRAY(STRUCT('apple', 100), 
STRUCT('banana', 101))), "
+                        + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 
201)))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,WrappedArray([apple,100], [banana,101])]",
+                        "[2,WrappedArray([cat,200], [dog,201])]");
+
+        spark.sql(
+                "ALTER TABLE paimon.default."
+                        + tableName
+                        + " CHANGE COLUMN v.element.f2 f2 BIGINT");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000), 
STRUCT('BANANA', 111))), "
+                        + "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,WrappedArray([APPLE,1000000000000], 
[BANANA,111])]",
+                        "[2,WrappedArray([cat,200], [dog,201])]",
+                        "[3,WrappedArray([FLOWER,3000000000000])]");
+    }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNestedColumnTypeInMap(String formatType) {
+        String tableName = "testRenameNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v MAP<INT, STRUCT<f1: STRING, f2: 
INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, MAP(10, STRUCT('apple', 100), 20, 
STRUCT('banana', 101))), "
+                        + "(2, MAP(10, STRUCT('cat', 200), 20, STRUCT('dog', 
201)))");
+        assertThat(
+                        spark.sql("SELECT k, v[10].f1, v[10].f2 FROM 
paimon.default." + tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[1,apple,100]", "[2,cat,200]");
+
+        spark.sql(
+                "ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN 
v.value.f2 f2 BIGINT");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, MAP(10, STRUCT('APPLE', 1000000000000), 
20, STRUCT('BANANA', 111))), "
+                        + "(3, MAP(10, STRUCT('FLOWER', 3000000000000)))");
+        assertThat(
+                        spark.sql("SELECT k, v[10].f1, v[10].f2 FROM 
paimon.default." + tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,APPLE,1000000000000]", "[2,cat,200]", 
"[3,FLOWER,3000000000000]");
+    }
 }

Reply via email to