This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b0774556 [core][spark] Support adding and dropping nested columns in 
Spark (#4483)
5b0774556 is described below

commit 5b0774556a9a08c3c0bb0cef54ea3c7ea0182094
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Fri Nov 8 19:37:06 2024 +0800

    [core][spark] Support adding and dropping nested columns in Spark (#4483)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |   2 +-
 .../org/apache/paimon/schema/SchemaChange.java     |  46 +++--
 .../apache/paimon/schema/SchemaEvolutionUtil.java  |  49 ++++-
 .../org/apache/paimon/schema/SchemaManager.java    | 229 ++++++++++++---------
 .../org/apache/paimon/catalog/CatalogTestBase.java |   8 +-
 .../apache/paimon/schema/SchemaManagerTest.java    |  79 +++++++
 .../java/org/apache/paimon/spark/SparkCatalog.java |   6 +-
 .../org/apache/paimon/spark/SparkReadITCase.java   |   5 +
 .../paimon/spark/SparkSchemaEvolutionITCase.java   |  83 ++++++++
 9 files changed, 375 insertions(+), 132 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c5cea0c21..4c36ad4db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -536,7 +536,7 @@ public abstract class AbstractCatalog implements Catalog {
         for (SchemaChange change : changes) {
             if (change instanceof SchemaChange.AddColumn) {
                 SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) 
change;
-                fieldNames.add(addColumn.fieldName());
+                fieldNames.addAll(addColumn.fieldNames());
             } else if (change instanceof SchemaChange.RenameColumn) {
                 SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) 
change;
                 fieldNames.add(rename.newName());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 1e790bf65..1b4c58e30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -25,6 +25,8 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -52,11 +54,16 @@ public interface SchemaChange extends Serializable {
     }
 
     static SchemaChange addColumn(String fieldName, DataType dataType, String 
comment) {
-        return new AddColumn(fieldName, dataType, comment, null);
+        return new AddColumn(Collections.singletonList(fieldName), dataType, 
comment, null);
     }
 
     static SchemaChange addColumn(String fieldName, DataType dataType, String 
comment, Move move) {
-        return new AddColumn(fieldName, dataType, comment, move);
+        return new AddColumn(Collections.singletonList(fieldName), dataType, 
comment, move);
+    }
+
+    static SchemaChange addColumn(
+            List<String> fieldNames, DataType dataType, String comment, Move 
move) {
+        return new AddColumn(fieldNames, dataType, comment, move);
     }
 
     static SchemaChange renameColumn(String fieldName, String newName) {
@@ -64,7 +71,11 @@ public interface SchemaChange extends Serializable {
     }
 
     static SchemaChange dropColumn(String fieldName) {
-        return new DropColumn(fieldName);
+        return new DropColumn(Collections.singletonList(fieldName));
+    }
+
+    static SchemaChange dropColumn(List<String> fieldNames) {
+        return new DropColumn(fieldNames);
     }
 
     static SchemaChange updateColumnType(String fieldName, DataType 
newDataType) {
@@ -207,20 +218,21 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final String fieldName;
+        private final List<String> fieldNames;
         private final DataType dataType;
         private final String description;
         private final Move move;
 
-        private AddColumn(String fieldName, DataType dataType, String 
description, Move move) {
-            this.fieldName = fieldName;
+        private AddColumn(
+                List<String> fieldNames, DataType dataType, String 
description, Move move) {
+            this.fieldNames = fieldNames;
             this.dataType = dataType;
             this.description = description;
             this.move = move;
         }
 
-        public String fieldName() {
-            return fieldName;
+        public List<String> fieldNames() {
+            return fieldNames;
         }
 
         public DataType dataType() {
@@ -246,7 +258,7 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             AddColumn addColumn = (AddColumn) o;
-            return Objects.equals(fieldName, addColumn.fieldName)
+            return Objects.equals(fieldNames, addColumn.fieldNames)
                     && dataType.equals(addColumn.dataType)
                     && Objects.equals(description, addColumn.description)
                     && move.equals(addColumn.move);
@@ -255,7 +267,7 @@ public interface SchemaChange extends Serializable {
         @Override
         public int hashCode() {
             int result = Objects.hash(dataType, description);
-            result = 31 * result + Objects.hashCode(fieldName);
+            result = 31 * result + Objects.hashCode(fieldNames);
             result = 31 * result + Objects.hashCode(move);
             return result;
         }
@@ -308,14 +320,14 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
-        private final String fieldName;
+        private final List<String> fieldNames;
 
-        private DropColumn(String fieldName) {
-            this.fieldName = fieldName;
+        private DropColumn(List<String> fieldNames) {
+            this.fieldNames = fieldNames;
         }
 
-        public String fieldName() {
-            return fieldName;
+        public List<String> fieldNames() {
+            return fieldNames;
         }
 
         @Override
@@ -327,12 +339,12 @@ public interface SchemaChange extends Serializable {
                 return false;
             }
             DropColumn that = (DropColumn) o;
-            return Objects.equals(fieldName, that.fieldName);
+            return Objects.equals(fieldNames, that.fieldNames);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hashCode(fieldName);
+            return Objects.hashCode(fieldNames);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index 083d131ec..b5d730707 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -22,6 +22,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
 import org.apache.paimon.casting.CastFieldGetter;
+import org.apache.paimon.casting.CastedRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
@@ -67,8 +68,6 @@ public class SchemaEvolutionUtil {
      * data fields, -1 is the index of 6->b in data fields and 1 is the index 
of 3->a in data
      * fields.
      *
-     * <p>/// TODO should support nest index mapping when nest schema 
evolution is supported.
-     *
      * @param tableFields the fields of table
      * @param dataFields the fields of underlying data
      * @return the index mapping
@@ -394,18 +393,32 @@ public class SchemaEvolutionUtil {
                     checkState(
                             !(tableField.type() instanceof MapType
                                     || dataField.type() instanceof ArrayType
-                                    || dataField.type() instanceof MultisetType
-                                    || dataField.type() instanceof RowType),
-                            "Only support column type evolution in atomic data 
type.");
+                                    || dataField.type() instanceof 
MultisetType),
+                            "Only support column type evolution in atomic and 
row data type.");
+
+                    CastExecutor<?, ?> castExecutor;
+                    if (tableField.type() instanceof RowType
+                            && dataField.type() instanceof RowType) {
+                        castExecutor =
+                                createRowCastExecutor(
+                                        (RowType) dataField.type(), (RowType) 
tableField.type());
+                    } else {
+                        castExecutor = CastExecutors.resolve(dataField.type(), 
tableField.type());
+                    }
+                    checkNotNull(
+                            castExecutor,
+                            "Cannot cast from type "
+                                    + dataField.type()
+                                    + " to type "
+                                    + tableField.type());
+
                     // Create getter with index i and projected row data will 
convert to underlying
                     // data
                     converterMapping[i] =
                             new CastFieldGetter(
                                     
InternalRowUtils.createNullCheckingFieldGetter(
                                             dataField.type(), i),
-                                    checkNotNull(
-                                            CastExecutors.resolve(
-                                                    dataField.type(), 
tableField.type())));
+                                    castExecutor);
                     castExist = true;
                 }
             }
@@ -413,4 +426,24 @@ public class SchemaEvolutionUtil {
 
         return castExist ? converterMapping : null;
     }
+
+    private static CastExecutor<InternalRow, InternalRow> 
createRowCastExecutor(
+            RowType inputType, RowType targetType) {
+        int[] indexMapping = createIndexMapping(targetType.getFields(), 
inputType.getFields());
+        CastFieldGetter[] castFieldGetters =
+                createCastFieldGetterMapping(
+                        targetType.getFields(), inputType.getFields(), 
indexMapping);
+
+        ProjectedRow projectedRow = indexMapping == null ? null : 
ProjectedRow.from(indexMapping);
+        CastedRow castedRow = castFieldGetters == null ? null : 
CastedRow.from(castFieldGetters);
+        return value -> {
+            if (projectedRow != null) {
+                value = projectedRow.replaceRow(value);
+            }
+            if (castedRow != null) {
+                value = castedRow.replaceRow(value);
+            }
+            return value;
+        };
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 7b987b049..6b4127cee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -40,7 +40,6 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeCasts;
-import org.apache.paimon.types.DataTypeVisitor;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
@@ -282,44 +281,52 @@ public class SchemaManager implements Serializable {
                 } else if (change instanceof AddColumn) {
                     AddColumn addColumn = (AddColumn) change;
                     SchemaChange.Move move = addColumn.move();
-                    if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
-                        throw new Catalog.ColumnAlreadyExistException(
-                                identifierFromPath(tableRoot.toString(), true, 
branch),
-                                addColumn.fieldName());
-                    }
                     Preconditions.checkArgument(
                             addColumn.dataType().isNullable(),
                             "Column %s cannot specify NOT NULL in the %s 
table.",
-                            addColumn.fieldName(),
+                            String.join(".", addColumn.fieldNames()),
                             identifierFromPath(tableRoot.toString(), true, 
branch).getFullName());
                     int id = highestFieldId.incrementAndGet();
                     DataType dataType =
                             ReassignFieldId.reassign(addColumn.dataType(), 
highestFieldId);
 
-                    DataField dataField =
-                            new DataField(
-                                    id, addColumn.fieldName(), dataType, 
addColumn.description());
-
-                    // key: name ; value : index
-                    Map<String, Integer> map = new HashMap<>();
-                    for (int i = 0; i < newFields.size(); i++) {
-                        map.put(newFields.get(i).name(), i);
-                    }
-
-                    if (null != move) {
-                        if 
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
-                            newFields.add(0, dataField);
-                        } else if 
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
-                            int fieldIndex = 
map.get(move.referenceFieldName());
-                            newFields.add(fieldIndex + 1, dataField);
+                    new 
NestedColumnModifier<Catalog.ColumnAlreadyExistException>(
+                            addColumn.fieldNames().toArray(new String[0])) {
+                        @Override
+                        protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
+                                throws Catalog.ColumnAlreadyExistException {
+                            for (DataField field : newFields) {
+                                if (field.name().equals(fieldName)) {
+                                    throw new 
Catalog.ColumnAlreadyExistException(
+                                            
identifierFromPath(tableRoot.toString(), true, branch),
+                                            String.join(".", 
addColumn.fieldNames()));
+                                }
+                            }
+
+                            DataField dataField =
+                                    new DataField(id, fieldName, dataType, 
addColumn.description());
+
+                            // key: name ; value : index
+                            Map<String, Integer> map = new HashMap<>();
+                            for (int i = 0; i < newFields.size(); i++) {
+                                map.put(newFields.get(i).name(), i);
+                            }
+
+                            if (null != move) {
+                                if 
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                                    newFields.add(0, dataField);
+                                } else if 
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                                    int fieldIndex = 
map.get(move.referenceFieldName());
+                                    newFields.add(fieldIndex + 1, dataField);
+                                }
+                            } else {
+                                newFields.add(dataField);
+                            }
                         }
-                    } else {
-                        newFields.add(dataField);
-                    }
-
+                    }.updateIntermediateColumn(newFields, 0);
                 } else if (change instanceof RenameColumn) {
                     RenameColumn rename = (RenameColumn) change;
-                    columnChangeValidation(oldTableSchema, change);
+                    renameColumnValidation(oldTableSchema, rename);
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
                                 identifierFromPath(tableRoot.toString(), true, 
branch),
@@ -329,7 +336,6 @@ public class SchemaManager implements Serializable {
                     updateNestedColumn(
                             newFields,
                             new String[] {rename.fieldName()},
-                            0,
                             (field) ->
                                     new DataField(
                                             field.id(),
@@ -338,16 +344,23 @@ public class SchemaManager implements Serializable {
                                             field.description()));
                 } else if (change instanceof DropColumn) {
                     DropColumn drop = (DropColumn) change;
-                    columnChangeValidation(oldTableSchema, change);
-                    if (!newFields.removeIf(
-                            f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
-                        throw new Catalog.ColumnNotExistException(
-                                identifierFromPath(tableRoot.toString(), true, 
branch),
-                                drop.fieldName());
-                    }
-                    if (newFields.isEmpty()) {
-                        throw new IllegalArgumentException("Cannot drop all 
fields in table");
-                    }
+                    dropColumnValidation(oldTableSchema, drop);
+                    new NestedColumnModifier<Catalog.ColumnNotExistException>(
+                            drop.fieldNames().toArray(new String[0])) {
+                        @Override
+                        protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
+                                throws Catalog.ColumnNotExistException {
+                            if (!newFields.removeIf(f -> 
f.name().equals(fieldName))) {
+                                throw new Catalog.ColumnNotExistException(
+                                        
identifierFromPath(tableRoot.toString(), true, branch),
+                                        String.join(".", drop.fieldNames()));
+                            }
+                            if (newFields.isEmpty()) {
+                                throw new IllegalArgumentException(
+                                        "Cannot drop all fields in table");
+                            }
+                        }
+                    }.updateIntermediateColumn(newFields, 0);
                 } else if (change instanceof UpdateColumnType) {
                     UpdateColumnType update = (UpdateColumnType) change;
                     if 
(oldTableSchema.partitionKeys().contains(update.fieldName())) {
@@ -356,9 +369,9 @@ public class SchemaManager implements Serializable {
                                         "Cannot update partition column [%s] 
type in the table[%s].",
                                         update.fieldName(), 
tableRoot.getName()));
                     }
-                    updateColumn(
+                    updateNestedColumn(
                             newFields,
-                            update.fieldName(),
+                            new String[] {update.fieldName()},
                             (field) -> {
                                 DataType targetType = update.newDataType();
                                 if (update.keepNullability()) {
@@ -392,7 +405,6 @@ public class SchemaManager implements Serializable {
                     updateNestedColumn(
                             newFields,
                             update.fieldNames(),
-                            0,
                             (field) ->
                                     new DataField(
                                             field.id(),
@@ -404,7 +416,6 @@ public class SchemaManager implements Serializable {
                     updateNestedColumn(
                             newFields,
                             update.fieldNames(),
-                            0,
                             (field) ->
                                     new DataField(
                                             field.id(),
@@ -569,74 +580,96 @@ public class SchemaManager implements Serializable {
                 .collect(Collectors.toList());
     }
 
-    private static void columnChangeValidation(TableSchema schema, 
SchemaChange change) {
-        /// TODO support partition and primary keys schema evolution
-        if (change instanceof DropColumn) {
-            String columnToDrop = ((DropColumn) change).fieldName();
-            if (schema.partitionKeys().contains(columnToDrop)
-                    || schema.primaryKeys().contains(columnToDrop)) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Cannot drop partition key or primary key: 
[%s]", columnToDrop));
-            }
-        } else if (change instanceof RenameColumn) {
-            String columnToRename = ((RenameColumn) change).fieldName();
-            if (schema.partitionKeys().contains(columnToRename)) {
-                throw new UnsupportedOperationException(
-                        String.format("Cannot rename partition column: [%s]", 
columnToRename));
-            }
-        } else {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Validation for %s is not supported",
-                            change.getClass().getSimpleName()));
+    private static void dropColumnValidation(TableSchema schema, DropColumn 
change) {
+        // primary keys and partition keys can't be nested columns
+        if (change.fieldNames().size() > 1) {
+            return;
+        }
+        String columnToDrop = change.fieldNames().get(0);
+        if (schema.partitionKeys().contains(columnToDrop)
+                || schema.primaryKeys().contains(columnToDrop)) {
+            throw new UnsupportedOperationException(
+                    String.format("Cannot drop partition key or primary key: 
[%s]", columnToDrop));
         }
     }
 
-    /** This method is hacky, newFields may be immutable. We should use {@link 
DataTypeVisitor}. */
-    private void updateNestedColumn(
-            List<DataField> newFields,
-            String[] updateFieldNames,
-            int index,
-            Function<DataField, DataField> updateFunc)
-            throws Catalog.ColumnNotExistException {
-        boolean found = false;
-        for (int i = 0; i < newFields.size(); i++) {
-            DataField field = newFields.get(i);
-            if (field.name().equals(updateFieldNames[index])) {
-                found = true;
-                if (index == updateFieldNames.length - 1) {
-                    newFields.set(i, updateFunc.apply(field));
-                    break;
-                } else {
-                    List<DataField> nestedFields =
-                            new ArrayList<>(
-                                    ((org.apache.paimon.types.RowType) 
field.type()).getFields());
-                    updateNestedColumn(nestedFields, updateFieldNames, index + 
1, updateFunc);
-                    newFields.set(
-                            i,
-                            new DataField(
-                                    field.id(),
-                                    field.name(),
-                                    new org.apache.paimon.types.RowType(
-                                            field.type().isNullable(), 
nestedFields),
-                                    field.description()));
+    private static void renameColumnValidation(TableSchema schema, 
RenameColumn change) {
+        String columnToRename = change.fieldName();
+        if (schema.partitionKeys().contains(columnToRename)) {
+            throw new UnsupportedOperationException(
+                    String.format("Cannot rename partition column: [%s]", 
columnToRename));
+        }
+    }
+
+    private abstract class NestedColumnModifier<E extends Exception> {
+
+        private final String[] updateFieldNames;
+
+        private NestedColumnModifier(String[] updateFieldNames) {
+            this.updateFieldNames = updateFieldNames;
+        }
+
+        public void updateIntermediateColumn(List<DataField> newFields, int 
depth)
+                throws Catalog.ColumnNotExistException, E {
+            if (depth == updateFieldNames.length - 1) {
+                updateLastColumn(newFields, updateFieldNames[depth]);
+                return;
+            }
+
+            for (int i = 0; i < newFields.size(); i++) {
+                DataField field = newFields.get(i);
+                if (!field.name().equals(updateFieldNames[depth])) {
+                    continue;
                 }
+
+                List<DataField> nestedFields =
+                        new ArrayList<>(
+                                ((org.apache.paimon.types.RowType) 
field.type()).getFields());
+                updateIntermediateColumn(nestedFields, depth + 1);
+                newFields.set(
+                        i,
+                        new DataField(
+                                field.id(),
+                                field.name(),
+                                new org.apache.paimon.types.RowType(
+                                        field.type().isNullable(), 
nestedFields),
+                                field.description()));
+                return;
             }
-        }
-        if (!found) {
+
             throw new Catalog.ColumnNotExistException(
                     identifierFromPath(tableRoot.toString(), true, branch),
-                    Arrays.toString(updateFieldNames));
+                    String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
         }
+
+        protected abstract void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                throws E;
     }
 
-    private void updateColumn(
+    private void updateNestedColumn(
             List<DataField> newFields,
-            String updateFieldName,
+            String[] updateFieldNames,
             Function<DataField, DataField> updateFunc)
             throws Catalog.ColumnNotExistException {
-        updateNestedColumn(newFields, new String[] {updateFieldName}, 0, 
updateFunc);
+        new 
NestedColumnModifier<Catalog.ColumnNotExistException>(updateFieldNames) {
+            @Override
+            protected void updateLastColumn(List<DataField> newFields, String 
fieldName)
+                    throws Catalog.ColumnNotExistException {
+                for (int i = 0; i < newFields.size(); i++) {
+                    DataField field = newFields.get(i);
+                    if (!field.name().equals(fieldName)) {
+                        continue;
+                    }
+
+                    newFields.set(i, updateFunc.apply(field));
+                    return;
+                }
+
+                throw new Catalog.ColumnNotExistException(
+                        identifierFromPath(tableRoot.toString(), true, branch),
+                        String.join(".", updateFieldNames));
+            }
+        }.updateIntermediateColumn(newFields, 0);
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index dbeedcfe5..643e1372b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -541,7 +541,7 @@ public abstract class CatalogTestBase {
                 .satisfies(
                         anyCauseMatches(
                                 Catalog.ColumnNotExistException.class,
-                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
     }
 
     @Test
@@ -647,7 +647,7 @@ public abstract class CatalogTestBase {
                 .satisfies(
                         anyCauseMatches(
                                 Catalog.ColumnNotExistException.class,
-                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
         // Alter table update a column type throws Exception when column is 
partition columns
         assertThatThrownBy(
                         () ->
@@ -718,7 +718,7 @@ public abstract class CatalogTestBase {
                 .satisfies(
                         anyCauseMatches(
                                 Catalog.ColumnNotExistException.class,
-                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
     }
 
     @Test
@@ -774,7 +774,7 @@ public abstract class CatalogTestBase {
                 .satisfies(
                         anyCauseMatches(
                                 Catalog.ColumnNotExistException.class,
-                                "Column [non_existing_col] does not exist in 
the test_db.test_table table."));
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
 
         // Alter table update a column nullability throws Exception when 
column is pk columns
         assertThatThrownBy(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 4bd965268..1a175de24 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -65,6 +65,7 @@ import java.util.stream.IntStream;
 
 import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link SchemaManager}. */
@@ -527,4 +528,82 @@ public class SchemaManagerTest {
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage("Change 'merge-engine' is not supported yet.");
     }
+
+    @Test
+    public void testAddAndDropNestedColumns() throws Exception {
+        RowType innerType =
+                RowType.of(
+                        new DataField(4, "f1", DataTypes.INT()),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        RowType middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        RowType outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+
+        Schema schema =
+                new Schema(
+                        outerType.getFields(),
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+        manager.createTable(schema);
+
+        SchemaChange addColumn =
+                SchemaChange.addColumn(
+                        Arrays.asList("v", "f2", "f3"),
+                        DataTypes.STRING(),
+                        "",
+                        SchemaChange.Move.after("f3", "f1"));
+        manager.commitChanges(addColumn);
+
+        innerType =
+                RowType.of(
+                        new DataField(4, "f1", DataTypes.INT()),
+                        new DataField(6, "f3", DataTypes.STRING(), ""),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+        
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+        assertThatCode(() -> manager.commitChanges(addColumn))
+                .hasMessageContaining("Column v.f2.f3 already exists");
+        SchemaChange middleColumnNotExistAddColumn =
+                SchemaChange.addColumn(
+                        Arrays.asList("v", "invalid", "f4"), 
DataTypes.STRING(), "", null);
+        assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistAddColumn))
+                .hasMessageContaining("Column v.invalid does not exist");
+
+        SchemaChange dropColumn = SchemaChange.dropColumn(Arrays.asList("v", 
"f2", "f1"));
+        manager.commitChanges(dropColumn);
+
+        innerType =
+                RowType.of(
+                        new DataField(6, "f3", DataTypes.STRING(), ""),
+                        new DataField(5, "f2", DataTypes.BIGINT()));
+        middleType =
+                RowType.of(
+                        new DataField(2, "f1", DataTypes.STRING()),
+                        new DataField(3, "f2", innerType));
+        outerType =
+                RowType.of(
+                        new DataField(0, "k", DataTypes.INT()), new 
DataField(1, "v", middleType));
+        
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+        assertThatCode(() -> manager.commitChanges(dropColumn))
+                .hasMessageContaining("Column v.f2.f1 does not exist");
+        SchemaChange middleColumnNotExistDropColumn =
+                SchemaChange.dropColumn(Arrays.asList("v", "invalid", "f2"));
+        assertThatCode(() -> 
manager.commitChanges(middleColumnNotExistDropColumn))
+                .hasMessageContaining("Column v.invalid does not exist");
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index b500da8f1..2ac1d032c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -371,10 +371,9 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
             }
         } else if (change instanceof TableChange.AddColumn) {
             TableChange.AddColumn add = (TableChange.AddColumn) change;
-            validateAlterNestedField(add.fieldNames());
             SchemaChange.Move move = getMove(add.position(), add.fieldNames());
             return SchemaChange.addColumn(
-                    add.fieldNames()[0],
+                    Arrays.asList(add.fieldNames()),
                     toPaimonType(add.dataType()).copy(add.isNullable()),
                     add.comment(),
                     move);
@@ -384,8 +383,7 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
             return SchemaChange.renameColumn(rename.fieldNames()[0], 
rename.newName());
         } else if (change instanceof TableChange.DeleteColumn) {
             TableChange.DeleteColumn delete = (TableChange.DeleteColumn) 
change;
-            validateAlterNestedField(delete.fieldNames());
-            return SchemaChange.dropColumn(delete.fieldNames()[0]);
+            return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
         } else if (change instanceof TableChange.UpdateColumnType) {
             TableChange.UpdateColumnType update = 
(TableChange.UpdateColumnType) change;
             validateAlterNestedField(update.fieldNames());
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 32c3498a7..b4565447c 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -456,6 +456,11 @@ public class SparkReadITCase extends SparkReadTestBase {
                 "INSERT INTO paimon.default."
                         + tableName
                         + " VALUES (2, STRUCT(20, STRUCT('banana', 200)))");
+        assertThat(
+                        spark.sql("SELECT v.f2.f1, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
         spark.sql(
                 "INSERT INTO paimon.default."
                         + tableName
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 9d958931c..e876a0027 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -23,6 +23,8 @@ import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
 import java.util.List;
@@ -705,4 +707,85 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                                         ","))
                 .collect(Collectors.toList());
     }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testAddAndDropNestedColumn(String formatType) {
+        String tableName = "testAddNestedColumnTable";
+        spark.sql(
+                "CREATE TABLE paimon.default."
+                        + tableName
+                        + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1: 
STRING, f2: INT>>) "
+                        + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k', 
'file.format' = '"
+                        + formatType
+                        + "')");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2, 
STRUCT(20, STRUCT('banana', 200)))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[1,[10,[apple,100]]]", 
"[2,[20,[banana,200]]]");
+        assertThat(
+                        spark.sql("SELECT v.f2.f1, k FROM paimon.default." + 
tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
+
+        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN 
v.f3 STRING");
+        spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN 
v.f2.f3 BIGINT");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101, 1001), 
'one')), (3, STRUCT(31, STRUCT('CHERRY', 301, 3001), 'three'))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,[11,[APPLE,101,1001],one]]",
+                        "[2,[20,[banana,200,null],null]]",
+                        "[3,[31,[CHERRY,301,3001],three]]");
+        assertThat(
+                        spark.sql("SELECT v.f2.f2, v.f3, k FROM 
paimon.default." + tableName)
+                                .collectAsList().stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder("[101,one,1]", "[200,null,2]", 
"[301,three,3]");
+
+        spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN 
v.f2.f1");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(12, STRUCT(102, 1002), 'one')), 
(4, STRUCT(42, STRUCT(402, 4002), 'four'))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,[12,[102,1002],one]]",
+                        "[2,[20,[200,null],null]]",
+                        "[3,[31,[301,3001],three]]",
+                        "[4,[42,[402,4002],four]]");
+
+        spark.sql(
+                "ALTER TABLE paimon.default."
+                        + tableName
+                        + " ADD COLUMN v.f2.f1 DECIMAL(5, 2) AFTER f2");
+        spark.sql(
+                "INSERT INTO paimon.default."
+                        + tableName
+                        + " VALUES (1, STRUCT(13, STRUCT(103, 100.03, 1003), 
'one')), (5, STRUCT(53, STRUCT(503, 500.03, 5003), 'five'))");
+        assertThat(
+                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
+                                .stream()
+                                .map(Row::toString))
+                .containsExactlyInAnyOrder(
+                        "[1,[13,[103,100.03,1003],one]]",
+                        "[2,[20,[200,null,null],null]]",
+                        "[3,[31,[301,null,3001],three]]",
+                        "[4,[42,[402,null,4002],four]]",
+                        "[5,[53,[503,500.03,5003],five]]");
+    }
 }


Reply via email to