This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 34153b386 [core][spark] Support renaming nested columns in Spark 
(#4489)
34153b386 is described below

commit 34153b386468d563250f3f3df7da5e57c69acf1d
Author: tsreaper <[email protected]>
AuthorDate: Mon Nov 11 15:19:15 2024 +0800

    [core][spark] Support renaming nested columns in Spark (#4489)
---
 .../org/apache/paimon/schema/SchemaChange.java     |  20 ++--
 .../org/apache/paimon/schema/SchemaManager.java    | 121 ++++++++++++++-------
 .../org/apache/paimon/schema/SchemaValidation.java |   8 +-
 .../org/apache/paimon/catalog/CatalogTestBase.java |  10 +-
 .../apache/paimon/schema/SchemaManagerTest.java    |  57 ++++++++++
 .../apache/paimon/table/SchemaEvolutionTest.java   |   2 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |   3 +-
 .../paimon/spark/SparkSchemaEvolutionITCase.java   |  29 +++++
 8 files changed, 189 insertions(+), 61 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 1b4c58e30..7e94b0a77 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -67,7 +67,11 @@ public interface SchemaChange extends Serializable {
     }
 
     static SchemaChange renameColumn(String fieldName, String newName) {
-        return new RenameColumn(fieldName, newName);
+        return new RenameColumn(Collections.singletonList(fieldName), newName);
+    }
+
+    static SchemaChange renameColumn(List<String> fieldNames, String newName) {
+        return new RenameColumn(fieldNames, newName);
     }
 
     static SchemaChange dropColumn(String fieldName) {
@@ -278,16 +282,16 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final String fieldName;
+        private final List<String> fieldNames;
         private final String newName;
 
-        private RenameColumn(String fieldName, String newName) {
-            this.fieldName = fieldName;
+        private RenameColumn(List<String> fieldNames, String newName) {
+            this.fieldNames = fieldNames;
             this.newName = newName;
         }
 
-        public String fieldName() {
-            return fieldName;
+        public List<String> fieldNames() {
+            return fieldNames;
         }
 
         public String newName() {
@@ -303,14 +307,14 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             RenameColumn that = (RenameColumn) o;
-            return Objects.equals(fieldName, that.fieldName)
+            return Objects.equals(fieldNames, that.fieldNames)
                     && Objects.equals(newName, that.newName);
         }
 
         @Override
         public int hashCode() {
             int result = Objects.hash(newName);
-            result = 31 * result + Objects.hashCode(fieldName);
+            result = 31 * result + Objects.hashCode(fieldNames);
             return result;
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 6b4127cee..5ffeca65d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -290,18 +290,11 @@ public class SchemaManager implements Serializable {
                     DataType dataType =
                             ReassignFieldId.reassign(addColumn.dataType(), 
highestFieldId);
 
-                    new 
NestedColumnModifier<Catalog.ColumnAlreadyExistException>(
-                            addColumn.fieldNames().toArray(new String[0])) {
+                    new 
NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) {
                         @Override
                         protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
                                 throws Catalog.ColumnAlreadyExistException {
-                            for (DataField field : newFields) {
-                                if (field.name().equals(fieldName)) {
-                                    throw new 
Catalog.ColumnAlreadyExistException(
-                                            
identifierFromPath(tableRoot.toString(), true, branch),
-                                            String.join(".", 
addColumn.fieldNames()));
-                                }
-                            }
+                            assertColumnNotExists(newFields, fieldName);
 
                             DataField dataField =
                                     new DataField(id, fieldName, dataType, 
addColumn.description());
@@ -327,34 +320,39 @@ public class SchemaManager implements Serializable {
                 } else if (change instanceof RenameColumn) {
                     RenameColumn rename = (RenameColumn) change;
                     renameColumnValidation(oldTableSchema, rename);
-                    if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
-                        throw new Catalog.ColumnAlreadyExistException(
-                                identifierFromPath(tableRoot.toString(), true, 
branch),
-                                rename.fieldName());
-                    }
+                    new NestedColumnModifier(rename.fieldNames().toArray(new 
String[0])) {
+                        @Override
+                        protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
+                                throws Catalog.ColumnNotExistException,
+                                        Catalog.ColumnAlreadyExistException {
+                            assertColumnExists(newFields, fieldName);
+                            assertColumnNotExists(newFields, rename.newName());
+                            for (int i = 0; i < newFields.size(); i++) {
+                                DataField field = newFields.get(i);
+                                if (!field.name().equals(fieldName)) {
+                                    continue;
+                                }
 
-                    updateNestedColumn(
-                            newFields,
-                            new String[] {rename.fieldName()},
-                            (field) ->
-                                    new DataField(
-                                            field.id(),
-                                            rename.newName(),
-                                            field.type(),
-                                            field.description()));
+                                DataField newField =
+                                        new DataField(
+                                                field.id(),
+                                                rename.newName(),
+                                                field.type(),
+                                                field.description());
+                                newFields.set(i, newField);
+                                return;
+                            }
+                        }
+                    }.updateIntermediateColumn(newFields, 0);
                 } else if (change instanceof DropColumn) {
                     DropColumn drop = (DropColumn) change;
                     dropColumnValidation(oldTableSchema, drop);
-                    new NestedColumnModifier<Catalog.ColumnNotExistException>(
-                            drop.fieldNames().toArray(new String[0])) {
+                    new NestedColumnModifier(drop.fieldNames().toArray(new 
String[0])) {
                         @Override
                         protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
                                 throws Catalog.ColumnNotExistException {
-                            if (!newFields.removeIf(f -> 
f.name().equals(fieldName))) {
-                                throw new Catalog.ColumnNotExistException(
-                                        
identifierFromPath(tableRoot.toString(), true, branch),
-                                        String.join(".", drop.fieldNames()));
-                            }
+                            assertColumnExists(newFields, fieldName);
+                            newFields.removeIf(f -> 
f.name().equals(fieldName));
                             if (newFields.isEmpty()) {
                                 throw new IllegalArgumentException(
                                         "Cannot drop all fields in table");
@@ -438,7 +436,7 @@ public class SchemaManager implements Serializable {
                     new Schema(
                             newFields,
                             oldTableSchema.partitionKeys(),
-                            applyColumnRename(
+                            applyNotNestedColumnRename(
                                     oldTableSchema.primaryKeys(),
                                     Iterables.filter(changes, 
RenameColumn.class)),
                             applySchemaChanges(newOptions, changes),
@@ -553,7 +551,8 @@ public class SchemaManager implements Serializable {
         if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
             List<String> bucketColumns = 
Arrays.asList(bucketKeysStr.split(","));
             List<String> newBucketColumns =
-                    applyColumnRename(bucketColumns, Iterables.filter(changes, 
RenameColumn.class));
+                    applyNotNestedColumnRename(
+                            bucketColumns, Iterables.filter(changes, 
RenameColumn.class));
             newOptions.put(BUCKET_KEY.key(), 
Joiner.on(',').join(newBucketColumns));
         }
 
@@ -561,9 +560,9 @@ public class SchemaManager implements Serializable {
         return newOptions;
     }
 
-    // Apply column rename changes to the list of column names, this will not 
change the order of
-    // the column names
-    private static List<String> applyColumnRename(
+    // Apply column rename changes on not nested columns to the list of column 
names, this will not
+    // change the order of the column names
+    private static List<String> applyNotNestedColumnRename(
             List<String> columns, Iterable<RenameColumn> renames) {
         if (Iterables.isEmpty(renames)) {
             return columns;
@@ -571,7 +570,9 @@ public class SchemaManager implements Serializable {
 
         Map<String, String> columnNames = Maps.newHashMap();
         for (RenameColumn renameColumn : renames) {
-            columnNames.put(renameColumn.fieldName(), renameColumn.newName());
+            if (renameColumn.fieldNames().size() == 1) {
+                columnNames.put(renameColumn.fieldNames().get(0), 
renameColumn.newName());
+            }
         }
 
         // The order of the column names will be preserved, as a non-parallel 
stream is used here.
@@ -594,14 +595,18 @@ public class SchemaManager implements Serializable {
     }
 
     private static void renameColumnValidation(TableSchema schema, 
RenameColumn change) {
-        String columnToRename = change.fieldName();
+        // partition keys can't be nested columns
+        if (change.fieldNames().size() > 1) {
+            return;
+        }
+        String columnToRename = change.fieldNames().get(0);
         if (schema.partitionKeys().contains(columnToRename)) {
             throw new UnsupportedOperationException(
                     String.format("Cannot rename partition column: [%s]", 
columnToRename));
         }
     }
 
-    private abstract class NestedColumnModifier<E extends Exception> {
+    private abstract class NestedColumnModifier {
 
         private final String[] updateFieldNames;
 
@@ -610,7 +615,7 @@ public class SchemaManager implements Serializable {
         }
 
         public void updateIntermediateColumn(List<DataField> newFields, int 
depth)
-                throws Catalog.ColumnNotExistException, E {
+                throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException {
             if (depth == updateFieldNames.length - 1) {
                 updateLastColumn(newFields, updateFieldNames[depth]);
                 return;
@@ -643,15 +648,47 @@ public class SchemaManager implements Serializable {
         }
 
         protected abstract void updateLastColumn(List<DataField> newFields, 
String fieldName)
-                throws E;
+                throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException;
+
+        protected void assertColumnExists(List<DataField> newFields, String 
fieldName)
+                throws Catalog.ColumnNotExistException {
+            for (DataField field : newFields) {
+                if (field.name().equals(fieldName)) {
+                    return;
+                }
+            }
+            throw new Catalog.ColumnNotExistException(
+                    identifierFromPath(tableRoot.toString(), true, branch),
+                    getLastFieldName(fieldName));
+        }
+
+        protected void assertColumnNotExists(List<DataField> newFields, String 
fieldName)
+                throws Catalog.ColumnAlreadyExistException {
+            for (DataField field : newFields) {
+                if (field.name().equals(fieldName)) {
+                    throw new Catalog.ColumnAlreadyExistException(
+                            identifierFromPath(tableRoot.toString(), true, 
branch),
+                            getLastFieldName(fieldName));
+                }
+            }
+        }
+
+        private String getLastFieldName(String fieldName) {
+            List<String> fieldNames = new ArrayList<>();
+            for (int i = 0; i + 1 < updateFieldNames.length; i++) {
+                fieldNames.add(updateFieldNames[i]);
+            }
+            fieldNames.add(fieldName);
+            return String.join(".", fieldNames);
+        }
     }
 
     private void updateNestedColumn(
             List<DataField> newFields,
             String[] updateFieldNames,
             Function<DataField, DataField> updateFunc)
-            throws Catalog.ColumnNotExistException {
-        new 
NestedColumnModifier<Catalog.ColumnNotExistException>(updateFieldNames) {
+            throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException {
+        new NestedColumnModifier(updateFieldNames) {
             @Override
             protected void updateLastColumn(List<DataField> newFields, String 
fieldName)
                     throws Catalog.ColumnNotExistException {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index bb9b440ce..20cbdea66 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -515,7 +515,7 @@ public class SchemaValidation {
 
     private static void validateSequenceField(TableSchema schema, CoreOptions 
options) {
         List<String> sequenceField = options.sequenceField();
-        if (sequenceField.size() > 0) {
+        if (!sequenceField.isEmpty()) {
             Map<String, Integer> fieldCount =
                     sequenceField.stream()
                             .collect(Collectors.toMap(field -> field, field -> 
1, Integer::sum));
@@ -596,12 +596,12 @@ public class SchemaValidation {
                                                                         == MAP
                                                                 || 
dataField.type().getTypeRoot()
                                                                         == 
ROW))
-                                .map(dataField -> dataField.name())
+                                .map(DataField::name)
                                 .collect(Collectors.toList());
-                if (nestedFields.size() > 0) {
+                if (!nestedFields.isEmpty()) {
                     throw new RuntimeException(
                             "nested type can not in bucket-key, in your table 
these key are "
-                                    + nestedFields.toString());
+                                    + nestedFields);
                 }
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 24eefbcb6..3bba6d562 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -513,7 +513,9 @@ public abstract class CatalogTestBase {
         catalog.createTable(
                 identifier,
                 new Schema(
-                        Lists.newArrayList(new DataField(0, "col1", 
DataTypes.STRING())),
+                        Lists.newArrayList(
+                                new DataField(0, "col1", DataTypes.STRING()),
+                                new DataField(1, "col2", DataTypes.STRING())),
                         Collections.emptyList(),
                         Collections.emptyList(),
                         Maps.newHashMap(),
@@ -525,7 +527,7 @@ public abstract class CatalogTestBase {
                 false);
         Table table = catalog.getTable(identifier);
 
-        assertThat(table.rowType().getFields()).hasSize(1);
+        assertThat(table.rowType().getFields()).hasSize(2);
         assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
         assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
 
@@ -536,12 +538,12 @@ public abstract class CatalogTestBase {
                                 catalog.alterTable(
                                         identifier,
                                         Lists.newArrayList(
-                                                
SchemaChange.renameColumn("col1", "new_col1")),
+                                                
SchemaChange.renameColumn("col2", "new_col1")),
                                         false))
                 .satisfies(
                         anyCauseMatches(
                                 Catalog.ColumnAlreadyExistException.class,
-                                "Column col1 already exists in the 
test_db.test_table table."));
+                                "Column new_col1 already exists in the 
test_db.test_table table."));
 
         // Alter table renames a column throws ColumnNotExistException when 
column does not exist
         assertThatThrownBy(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 1a175de24..5fb76387e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -606,4 +606,61 @@ public class SchemaManagerTest {
         assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistDropColumn))
                 .hasMessageContaining("Column v.invalid does not exist");
     }
+
+    @Test
+    public void testRenameNestedColumns() throws Exception {
+        RowType innerType =
+                RowType.of(
+                        new DataField(4, "f1", DataTypes.INT()),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        RowType middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        RowType outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+
+        Schema schema =
+                new Schema(
+                        outerType.getFields(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+        manager.createTable(schema);
+
+        SchemaChange renameColumn =
+                SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"), 
"f100");
+        manager.commitChanges(renameColumn);
+
+        innerType =
+                RowType.of(
+                        new DataField(4, "f100", DataTypes.INT()),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+        
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+        SchemaChange middleColumnNotExistRenameColumn =
+                SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"), 
"f200");
+        assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistRenameColumn))
+                .hasMessageContaining("Column v.invalid does not exist");
+
+        SchemaChange lastColumnNotExistRenameColumn =
+                SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"), 
"new_invalid");
+        assertThatCode(() -> 
manager.commitChanges(lastColumnNotExistRenameColumn))
+                .hasMessageContaining("Column v.f2.invalid does not exist");
+
+        SchemaChange newNameAlreadyExistRenameColumn =
+                SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"), 
"f100");
+        assertThatCode(() -> 
manager.commitChanges(newNameAlreadyExistRenameColumn))
+                .hasMessageContaining("Column v.f2.f100 already exists");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 44c3beea7..951539299 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -353,7 +353,7 @@ public class SchemaEvolutionTest {
                 .hasMessage(
                         String.format(
                                 "Column %s already exists in the %s table.",
-                                "f0", identifier.getFullName()));
+                                "f1", identifier.getFullName()));
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 2ac1d032c..82c8939ea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -379,8 +379,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
                     move);
         } else if (change instanceof TableChange.RenameColumn) {
             TableChange.RenameColumn rename = (TableChange.RenameColumn) 
change;
-            validateAlterNestedField(rename.fieldNames());
-            return SchemaChange.renameColumn(rename.fieldNames()[0], 
rename.newName());
+            return 
SchemaChange.renameColumn(Arrays.asList(rename.fieldNames()), rename.newName());
         } else if (change instanceof TableChange.DeleteColumn) {
             TableChange.DeleteColumn delete = (TableChange.DeleteColumn) 
change;
             return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index e876a0027..ccae59e88 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -788,4 +788,33 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         "[4,[42,[402,null,4002],four]]",
                         "[5,[53,[503,500.03,5003],five]]");
     }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testRenameNestedColumn(String formatType) {
+        String tableName = "testRenameNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: 
STRING, f2: INT>>) "
+                        + "TBLPROPERTIES ('file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, 
STRUCT(20, STRUCT('banana', 200)))");
+        assertThat(
+                        spark.sql("SELECT v.f2.f1, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
+
+        spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN 
v.f2.f1 to f100");
+        assertThat(
+                        spark.sql("SELECT v.f2.f100, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
+    }
 }

Reply via email to