This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed1786d5bdc [HUDI-6865] Fix InternalSchema schemaId when column is 
dropped (#9724)
ed1786d5bdc is described below

commit ed1786d5bdc210c8d516bb103130265f912c86d3
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Sep 18 09:38:49 2023 +0530

    [HUDI-6865] Fix InternalSchema schemaId when column is dropped (#9724)
---
 .../hudi/internal/schema/InternalSchema.java       |  9 +++--
 .../schema/action/InternalSchemaMerger.java        |  6 ++--
 .../internal/schema/utils/InternalSchemaUtils.java | 12 +++----
 .../internal/schema/action/TestMergeSchema.java    | 38 +++++++++++-----------
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 25 +++++++++++++-
 .../spark/sql/hudi/command/AlterTableCommand.scala |  9 ++---
 6 files changed, 61 insertions(+), 38 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
index 237eb95285c..ce5f8f259da 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
@@ -158,12 +158,12 @@ public class InternalSchema implements Serializable {
   }
 
   /**
-   * Returns the {@link Type} of a sub-field identified by the field name.
+   * Returns the fully qualified name of the field corresponding to the given 
id.
    *
    * @param id a field id
-   * @return fullName of field of
+   * @return full name of field corresponding to id
    */
-  public String findfullName(int id) {
+  public String findFullName(int id) {
     if (idToName == null) {
       buildIdToName();
     }
@@ -272,8 +272,7 @@ public class InternalSchema implements Serializable {
   public String toString() {
     return String.format("table {\n%s\n}",
         StringUtils.join(record.fields().stream()
-            .map(f -> " " + f)
-            .collect(Collectors.toList()).toArray(new String[0]), "\n"));
+            .map(f -> " " + f).toArray(String[]::new), "\n"));
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
index 17a53d8139d..9ed55a7e573 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
@@ -116,9 +116,9 @@ public class InternalSchemaMerger {
       Type newType = newTypes.get(i);
       Types.Field oldField = oldFields.get(i);
       int fieldId = oldField.fieldId();
-      String fullName = querySchema.findfullName(fieldId);
+      String fullName = querySchema.findFullName(fieldId);
       if (fileSchema.findField(fieldId) != null) {
-        if (fileSchema.findfullName(fieldId).equals(fullName)) {
+        if (fileSchema.findFullName(fieldId).equals(fullName)) {
           // maybe col type changed, deal with it.
           newFields.add(Types.Field.get(oldField.fieldId(), 
oldField.isOptional(), oldField.name(), newType, oldField.doc()));
         } else {
@@ -173,7 +173,7 @@ public class InternalSchemaMerger {
       }
       String parentName = sb.toString();
       int parentFieldIdFromQuerySchema = querySchema.findIdByName(parentName);
-      String parentNameFromFileSchema = 
fileSchema.findfullName(parentFieldIdFromQuerySchema);
+      String parentNameFromFileSchema = 
fileSchema.findFullName(parentFieldIdFromQuerySchema);
       if (parentNameFromFileSchema.isEmpty()) {
         break;
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index cf66986e155..94e72ff7180 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -184,13 +184,13 @@ public class InternalSchemaUtils {
       // the read file does not contain current col, so current colFilter is 
invalid
       return "";
     } else {
-      if (name.equals(fileSchema.findfullName(nameId))) {
+      if (name.equals(fileSchema.findFullName(nameId))) {
         // no change happened on current col
         return name;
       } else {
         // find rename operation on current col
         // return the name from fileSchema
-        return fileSchema.findfullName(nameId);
+        return fileSchema.findFullName(nameId);
       }
     }
   }
@@ -210,8 +210,8 @@ public class InternalSchemaUtils {
     Map<Integer, Pair<Type, Type>> result = new HashMap<>();
     ids.stream().filter(f -> otherIds.contains(f)).forEach(f -> {
       if (!schema.findType(f).equals(oldSchema.findType(f))) {
-        String[] fieldNameParts = schema.findfullName(f).split("\\.");
-        String[] otherFieldNameParts = oldSchema.findfullName(f).split("\\.");
+        String[] fieldNameParts = schema.findFullName(f).split("\\.");
+        String[] otherFieldNameParts = oldSchema.findFullName(f).split("\\.");
         String parentName = fieldNameParts[0];
         String otherParentName = otherFieldNameParts[0];
         if (fieldNameParts.length == otherFieldNameParts.length && 
schema.findIdByName(parentName) == oldSchema.findIdByName(otherParentName)) {
@@ -280,8 +280,8 @@ public class InternalSchemaUtils {
     return colNamesFromWriteSchema.stream().filter(f -> {
       int fieldIdFromWriteSchema = oldSchema.findIdByName(f);
       // try to find the cols which has the same id, but have different 
colName;
-      return newSchema.getAllIds().contains(fieldIdFromWriteSchema) && 
!newSchema.findfullName(fieldIdFromWriteSchema).equalsIgnoreCase(f);
-    }).collect(Collectors.toMap(e -> 
newSchema.findfullName(oldSchema.findIdByName(e)), e -> {
+      return newSchema.getAllIds().contains(fieldIdFromWriteSchema) && 
!newSchema.findFullName(fieldIdFromWriteSchema).equalsIgnoreCase(f);
+    }).collect(Collectors.toMap(e -> 
newSchema.findFullName(oldSchema.findIdByName(e)), e -> {
       int lastDotIndex = e.lastIndexOf(".");
       return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1);
     }));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
index 5a311c239dc..5240179fb8c 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
@@ -22,11 +22,12 @@ import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 /**
  * Tests {@link InternalSchemaMerger}.
  */
@@ -34,11 +35,11 @@ public class TestMergeSchema {
 
   @Test
   public void testPrimitiveMerge() {
-    Types.RecordType record = Types.RecordType.get(Arrays.asList(new 
Types.Field[] {
+    Types.RecordType record = Types.RecordType.get(Arrays.asList(
         Types.Field.get(0, "col1", Types.BooleanType.get()),
         Types.Field.get(1, "col2", Types.IntType.get()),
         Types.Field.get(2, "col3", Types.LongType.get()),
-        Types.Field.get(3, "col4", Types.FloatType.get())}));
+        Types.Field.get(3, "col4", Types.FloatType.get())));
 
     InternalSchema oldSchema = new InternalSchema(record);
     // add c1 after 'col1', and c2 before 'col3'
@@ -52,6 +53,7 @@ public class TestMergeSchema {
     deleteChange.deleteColumn("col1");
     deleteChange.deleteColumn("col3");
     InternalSchema newDeleteSchema = 
SchemaChangeUtils.applyTableChanges2Schema(newAddSchema, deleteChange);
+    assertEquals(newAddSchema.getMaxColumnId(), 
newDeleteSchema.getMaxColumnId());
 
     TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(newDeleteSchema);
     updateChange.updateColumnType("col2", Types.LongType.get())
@@ -67,25 +69,23 @@ public class TestMergeSchema {
     // merge schema by using columnType from query schema
     InternalSchema mergeSchema = new InternalSchemaMerger(oldSchema, 
finalSchema, true, false).mergeSchema();
 
-    InternalSchema checkedSchema = new 
InternalSchema(Types.RecordType.get(Arrays.asList(new Types.Field[] {
-            Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1 
after col1"),
-            Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before 
col3"),
-            Types.Field.get(3, true, "col4", Types.FloatType.get()),
-            Types.Field.get(1, true, "col2", Types.LongType.get(), "alter col2 
comments"),
-            Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(), 
"add new col1")
-        })));
-    Assertions.assertEquals(mergeSchema, checkedSchema);
+    InternalSchema checkedSchema = new 
InternalSchema(Types.RecordType.get(Arrays.asList(
+        Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1 after 
col1"),
+        Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before 
col3"),
+        Types.Field.get(3, true, "col4", Types.FloatType.get()),
+        Types.Field.get(1, true, "col2", Types.LongType.get(), "alter col2 
comments"),
+        Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(), "add 
new col1"))));
+    assertEquals(mergeSchema, checkedSchema);
 
     // merge schema by using columnType from file schema
     InternalSchema mergeSchema1 = new InternalSchemaMerger(oldSchema, 
finalSchema, true, true).mergeSchema();
-    InternalSchema checkedSchema1 = new 
InternalSchema(Types.RecordType.get(Arrays.asList(new Types.Field[] {
-            Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1 
after col1"),
-            Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before 
col3"),
-            Types.Field.get(3, true, "col4", Types.FloatType.get()),
-            Types.Field.get(1, true, "col2", Types.IntType.get(), "alter col2 
comments"),
-            Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(), 
"add new col1")
-        })));
-    Assertions.assertEquals(mergeSchema1, checkedSchema1);
+    InternalSchema checkedSchema1 = new 
InternalSchema(Types.RecordType.get(Arrays.asList(
+        Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1 after 
col1"),
+        Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before 
col3"),
+        Types.Field.get(3, true, "col4", Types.FloatType.get()),
+        Types.Field.get(1, true, "col2", Types.IntType.get(), "alter col2 
comments"),
+        Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(), "add 
new col1"))));
+    assertEquals(mergeSchema1, checkedSchema1);
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 77df8d08418..137efba2861 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.QuickstartUtils.{DataGenerator, 
convertToStringList, getQ
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
RawTripTestPayload}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.DataSourceTestUtils
@@ -436,22 +437,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           checkAnswer(createTestResult(tableName))(
             Seq(1, "jack", "haha", 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
           )
+          var maxColumnId = getMaxColumnId(tablePath)
           // drop column newprice
-
           spark.sql(s"alter table ${tableName} drop column newprice")
           checkAnswer(createTestResult(tableName))(
             Seq(1, "jack", "haha", 1000), Seq(2, "jack","exx1", 1000)
           )
+          validateInternalSchema(tablePath, isDropColumn = true, 
currentMaxColumnId = maxColumnId)
+          maxColumnId = getMaxColumnId(tablePath)
           // add newprice back
           spark.sql(s"alter table ${tableName} add columns(newprice string 
comment 'add newprice back' after ext1)")
           checkAnswer(createTestResult(tableName))(
             Seq(1, "jack", "haha", null, 1000), Seq(2, "jack","exx1", null, 
1000)
           )
+          validateInternalSchema(tablePath, isDropColumn = false, 
currentMaxColumnId = maxColumnId)
         }
       }
     })
   }
 
+  private def validateInternalSchema(basePath: String, isDropColumn: Boolean, 
currentMaxColumnId: Int): Unit = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build()
+    val schema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata.get()
+    val lastInstant = 
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
+    assert(schema.schemaId() == lastInstant.getTimestamp.toLong)
+    if (isDropColumn) {
+      assert(schema.getMaxColumnId == currentMaxColumnId)
+    } else {
+      assert(schema.getMaxColumnId == currentMaxColumnId + 1)
+    }
+  }
+
+  private def getMaxColumnId(basePath: String): Int = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build()
+    new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata.get.getMaxColumnId
+  }
+
   test("Test alter column nullability") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index 779bde3326d..ecb230a515f 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -102,14 +102,17 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
   }
 
-  def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
+  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
     val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
     deleteChanges.foreach { c =>
       val originalColName = c.fieldNames().mkString(".")
       checkSchemaChange(Seq(originalColName), table)
       deleteChange.deleteColumn(originalColName)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
deleteChange).setSchemaId(oldSchema.getMaxColumnId)
+    val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
deleteChange)
+    // delete action should not change the getMaxColumnId field
+    newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
+    newSchema
   }
 
 
@@ -128,8 +131,6 @@ case class AlterTableCommand(table: CatalogTable, changes: 
Seq[TableChange], cha
   def applyDeleteAction(sparkSession: SparkSession): Unit = {
     val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyDeleteAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[DeleteColumn]))
-    // delete action should not change the getMaxColumnId field.
-    newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
     val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
       SerDeHelper.inheritSchemas(oldSchema, "")
     } else {

Reply via email to