This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c0eb6a87 [#3136] fix(spark-connector): support replace column with 
same column name for spark connector (#3461)
2c0eb6a87 is described below

commit 2c0eb6a873bf1bb65dad0234eb0b699ba892ebae
Author: FANNG <[email protected]>
AuthorDate: Mon Jul 29 09:58:03 2024 +0800

    [#3136] fix(spark-connector): support replace column with same column name 
for spark connector (#3461)
    
    ### What changes were proposed in this pull request?
    To support replace column:
    * support delete and add **`same column name`** in valid column change
    logic in hive catalog
    * remove get column position logic if not specifying column position in
    Iceberg catalog
    
    ### Why are the changes needed?
    
    Fix: #3136
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    add IT
---
 .../catalog/hive/HiveCatalogOperations.java        | 14 ++++---
 .../iceberg/ops/IcebergTableOpsHelper.java         | 44 +++++++++-------------
 .../integration/test/CatalogIcebergBaseIT.java     | 37 ++++++++++++++++++
 gradle.properties                                  |  2 +-
 .../connector/integration/test/SparkCommonIT.java  | 41 ++++++++++++++++++++
 .../integration/test/hive/SparkHiveCatalogIT.java  |  5 +++
 .../test/iceberg/SparkIcebergCatalogIT.java        |  5 +++
 .../integration/test/util/SparkUtilIT.java         |  3 ++
 8 files changed, 118 insertions(+), 33 deletions(-)

diff --git 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
index a7f70b97e..afae1cbbd 100644
--- 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
+++ 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
@@ -674,11 +674,11 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
         .filter(c -> c instanceof TableChange.ColumnChange)
         .forEach(
             c -> {
-              String fieldToAdd = String.join(".", ((TableChange.ColumnChange) 
c).fieldName());
+              String fieldName = String.join(".", ((TableChange.ColumnChange) 
c).fieldName());
               Preconditions.checkArgument(
                   c instanceof TableChange.UpdateColumnComment
-                      || !partitionFields.contains(fieldToAdd),
-                  "Cannot alter partition column: " + fieldToAdd);
+                      || !partitionFields.contains(fieldName),
+                  "Cannot alter partition column: " + fieldName);
 
               if (c instanceof TableChange.UpdateColumnPosition
                   && afterPartitionColumn(
@@ -687,12 +687,16 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
                     "Cannot alter column position to after partition column");
               }
 
+              if (c instanceof TableChange.DeleteColumn) {
+                existingFields.remove(fieldName);
+              }
+
               if (c instanceof TableChange.AddColumn) {
                 TableChange.AddColumn addColumn = (TableChange.AddColumn) c;
 
-                if (existingFields.contains(fieldToAdd)) {
+                if (existingFields.contains(fieldName)) {
                   throw new IllegalArgumentException(
-                      "Cannot add column with duplicate name: " + fieldToAdd);
+                      "Cannot add column with duplicate name: " + fieldName);
                 }
 
                 if (addColumn.getPosition() == null) {
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
index 176e2cbbe..86da0eaea 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/IcebergTableOpsHelper.java
@@ -133,9 +133,13 @@ public class IcebergTableOpsHelper {
   }
 
   private void doUpdateColumnPosition(
-      UpdateSchema icebergUpdateSchema, UpdateColumnPosition 
updateColumnPosition) {
-    doMoveColumn(
-        icebergUpdateSchema, updateColumnPosition.fieldName(), 
updateColumnPosition.getPosition());
+      UpdateSchema icebergUpdateSchema,
+      UpdateColumnPosition updateColumnPosition,
+      Schema icebergTableSchema) {
+    StructType tableSchema = icebergTableSchema.asStruct();
+    ColumnPosition columnPosition =
+        getColumnPositionForIceberg(tableSchema, 
updateColumnPosition.getPosition());
+    doMoveColumn(icebergUpdateSchema, updateColumnPosition.fieldName(), 
columnPosition);
   }
 
   private void doUpdateColumnType(
@@ -156,7 +160,9 @@ public class IcebergTableOpsHelper {
     icebergUpdateSchema.updateColumn(fieldName, (PrimitiveType) type);
   }
 
-  private ColumnPosition getAddColumnPosition(StructType parent, 
ColumnPosition columnPosition) {
+  // Iceberg doesn't support LAST position, transform to FIRST or AFTER.
+  private ColumnPosition getColumnPositionForIceberg(
+      StructType parent, ColumnPosition columnPosition) {
     if (!(columnPosition instanceof TableChange.Default)) {
       return columnPosition;
     }
@@ -171,25 +177,7 @@ public class IcebergTableOpsHelper {
     return ColumnPosition.after(last.name());
   }
 
-  private void doAddColumn(
-      UpdateSchema icebergUpdateSchema, AddColumn addColumn, Schema 
icebergTableSchema) {
-    String parentName = getParentName(addColumn.fieldName());
-    StructType parentStruct;
-    if (parentName != null) {
-      org.apache.iceberg.types.Type parent = 
icebergTableSchema.findType(parentName);
-      Preconditions.checkArgument(
-          parent != null, "Couldn't find parent field: " + parentName + " in 
Iceberg table");
-      Preconditions.checkArgument(
-          parent instanceof StructType,
-          "Couldn't add column to non-struct field, name:"
-              + parentName
-              + ", type:"
-              + parent.getClass().getSimpleName());
-      parentStruct = (StructType) parent;
-    } else {
-      parentStruct = icebergTableSchema.asStruct();
-    }
-
+  private void doAddColumn(UpdateSchema icebergUpdateSchema, AddColumn 
addColumn) {
     if (addColumn.isAutoIncrement()) {
       throw new IllegalArgumentException("Iceberg doesn't support auto 
increment column");
     }
@@ -210,8 +198,9 @@ public class IcebergTableOpsHelper {
           addColumn.getComment());
     }
 
-    ColumnPosition position = getAddColumnPosition(parentStruct, 
addColumn.getPosition());
-    doMoveColumn(icebergUpdateSchema, addColumn.fieldName(), position);
+    if (!ColumnPosition.defaultPos().equals(addColumn.getPosition())) {
+      doMoveColumn(icebergUpdateSchema, addColumn.fieldName(), 
addColumn.getPosition());
+    }
   }
 
   private void alterTableProperty(
@@ -237,11 +226,12 @@ public class IcebergTableOpsHelper {
       Schema icebergTableSchema) {
     for (ColumnChange change : columnChanges) {
       if (change instanceof AddColumn) {
-        doAddColumn(icebergUpdateSchema, (AddColumn) change, 
icebergTableSchema);
+        doAddColumn(icebergUpdateSchema, (AddColumn) change);
       } else if (change instanceof DeleteColumn) {
         doDeleteColumn(icebergUpdateSchema, (DeleteColumn) change, 
icebergTableSchema);
       } else if (change instanceof UpdateColumnPosition) {
-        doUpdateColumnPosition(icebergUpdateSchema, (UpdateColumnPosition) 
change);
+        doUpdateColumnPosition(
+            icebergUpdateSchema, (UpdateColumnPosition) change, 
icebergTableSchema);
       } else if (change instanceof RenameColumn) {
         doRenameColumn(icebergUpdateSchema, (RenameColumn) change);
       } else if (change instanceof UpdateColumnType) {
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index 078fe803c..7fd503550 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -584,6 +584,43 @@ public abstract class CatalogIcebergBaseIT extends 
AbstractIT {
     Assertions.assertEquals(0, tableIdentifiers.size());
   }
 
+  @Test
+  public void testUpdateIcebergColumnDefaultPosition() {
+    Column col1 = Column.of("name", Types.StringType.get(), "comment");
+    Column col2 = Column.of("address", Types.StringType.get(), "comment");
+    Column col3 = Column.of("date_of_birth", Types.StringType.get(), 
"comment");
+    Column[] newColumns = new Column[] {col1, col2, col3};
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("CatalogIcebergIT_table"));
+    catalog
+        .asTableCatalog()
+        .createTable(
+            tableIdentifier,
+            newColumns,
+            table_comment,
+            ImmutableMap.of(),
+            Transforms.EMPTY_TRANSFORM,
+            Distributions.NONE,
+            new SortOrder[0]);
+
+    catalog
+        .asTableCatalog()
+        .alterTable(
+            tableIdentifier,
+            TableChange.updateColumnPosition(
+                new String[] {col1.name()}, 
TableChange.ColumnPosition.defaultPos()));
+
+    Table updateColumnPositionTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+
+    Column[] updateCols = updateColumnPositionTable.columns();
+    Assertions.assertEquals(3, updateCols.length);
+    Assertions.assertEquals(col2.name(), updateCols[0].name());
+    Assertions.assertEquals(col3.name(), updateCols[1].name());
+    Assertions.assertEquals(col1.name(), updateCols[2].name());
+
+    catalog.asTableCatalog().dropTable(tableIdentifier);
+  }
+
   @Test
   public void testAlterIcebergTable() {
     Column[] columns = createColumns();
diff --git a/gradle.properties b/gradle.properties
index b4deeef33..5b36e4623 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -40,4 +40,4 @@ defaultScalaVersion = 2.12
 pythonVersion = 3.8
 
 # skipDockerTests is used to skip the tests that require Docker to be running.
-skipDockerTests = true
+skipDockerTests = false
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index a67fcd218..63e4801ef 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -115,6 +115,8 @@ public abstract class SparkCommonIT extends SparkEnvIT {
 
   protected abstract boolean supportsDelete();
 
+  protected abstract boolean supportsSchemaEvolution();
+
   // Use a custom database not the original default database because 
SparkCommonIT couldn't
   // read&write data to tables in default database. The main reason is default 
database location is
   // determined by `hive.metastore.warehouse.dir` in hive-site.xml which is 
local HDFS address
@@ -547,6 +549,45 @@ public abstract class SparkCommonIT extends SparkEnvIT {
     checkTableColumns(tableName, updateCommentColumns, 
getTableInfo(tableName));
   }
 
+  @Test
+  void testAlterTableReplaceColumns() {
+    String tableName = "test_replace_columns_table";
+    dropTableIfExists(tableName);
+
+    createSimpleTable(tableName);
+    List<SparkColumnInfo> simpleTableColumns = getSimpleTableColumn();
+    SparkTableInfo tableInfo = getTableInfo(tableName);
+    checkTableColumns(tableName, simpleTableColumns, tableInfo);
+    checkTableReadWrite(tableInfo);
+    String firstLine = getExpectedTableData(tableInfo);
+
+    sql(
+        String.format(
+            "ALTER TABLE %S REPLACE COLUMNS (id int COMMENT 'new comment', 
name2 string, age long);",
+            tableName));
+    ArrayList<SparkColumnInfo> updateColumns = new ArrayList<>();
+    // change comment for id
+    updateColumns.add(SparkColumnInfo.of("id", DataTypes.IntegerType, "new 
comment"));
+    // change column name
+    updateColumns.add(SparkColumnInfo.of("name2", DataTypes.StringType, null));
+    // change column type
+    updateColumns.add(SparkColumnInfo.of("age", DataTypes.LongType, null));
+
+    tableInfo = getTableInfo(tableName);
+    checkTableColumns(tableName, updateColumns, tableInfo);
+    sql(String.format("INSERT INTO %S VALUES(3, 'name2', 10)", tableName));
+    List<String> data = getQueryData(String.format("SELECT * from %s ORDER BY 
id", tableName));
+    Assertions.assertEquals(2, data.size());
+    if (supportsSchemaEvolution()) {
+      // It's different columns for Iceberg if delete and add a column with 
same name.
+      Assertions.assertEquals(
+          String.join(",", Arrays.asList(NULL_STRING, NULL_STRING, 
NULL_STRING)), data.get(0));
+    } else {
+      Assertions.assertEquals(firstLine, data.get(0));
+    }
+    Assertions.assertEquals("3,name2,10", data.get(1));
+  }
+
   @Test
   void testComplexType() {
     String tableName = "complex_type_table";
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
index a4ce99811..5680e2e30 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/hive/SparkHiveCatalogIT.java
@@ -76,6 +76,11 @@ public abstract class SparkHiveCatalogIT extends 
SparkCommonIT {
     return false;
   }
 
+  @Override
+  protected boolean supportsSchemaEvolution() {
+    return false;
+  }
+
   @Test
   void testCreateHiveFormatPartitionTable() {
     String tableName = "hive_partition_table";
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index 4df4992b5..52f4abf3a 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -99,6 +99,11 @@ public abstract class SparkIcebergCatalogIT extends 
SparkCommonIT {
     return true;
   }
 
+  @Override
+  protected boolean supportsSchemaEvolution() {
+    return true;
+  }
+
   @Override
   protected String getTableLocation(SparkTableInfo table) {
     return String.join(File.separator, table.getTableLocation(), "data");
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
index 3a25cb022..05637c0ce 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/util/SparkUtilIT.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.api.Assertions;
  * <p>Referred from 
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
  */
 public abstract class SparkUtilIT extends AbstractIT {
+  protected static final String NULL_STRING = "NULL";
 
   protected abstract SparkSession getSparkSession();
 
@@ -103,6 +104,8 @@ public abstract class SparkUtilIT extends AbstractIT {
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       sdf.setTimeZone(TimeZone.getTimeZone(TIME_ZONE_UTC));
       return sdf.format(timestamp);
+    } else if (item == null) {
+      return NULL_STRING;
     } else {
       return item.toString();
     }

Reply via email to