This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ed1786d5bdc [HUDI-6865] Fix InternalSchema schemaId when column is
dropped (#9724)
ed1786d5bdc is described below
commit ed1786d5bdc210c8d516bb103130265f912c86d3
Author: Sagar Sumit <[email protected]>
AuthorDate: Mon Sep 18 09:38:49 2023 +0530
[HUDI-6865] Fix InternalSchema schemaId when column is dropped (#9724)
---
.../hudi/internal/schema/InternalSchema.java | 9 +++--
.../schema/action/InternalSchemaMerger.java | 6 ++--
.../internal/schema/utils/InternalSchemaUtils.java | 12 +++----
.../internal/schema/action/TestMergeSchema.java | 38 +++++++++++-----------
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 25 +++++++++++++-
.../spark/sql/hudi/command/AlterTableCommand.scala | 9 ++---
6 files changed, 61 insertions(+), 38 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
index 237eb95285c..ce5f8f259da 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/InternalSchema.java
@@ -158,12 +158,12 @@ public class InternalSchema implements Serializable {
}
/**
- * Returns the {@link Type} of a sub-field identified by the field name.
+ * Returns the fully qualified name of the field corresponding to the given
id.
*
* @param id a field id
- * @return fullName of field of
+ * @return full name of field corresponding to id
*/
- public String findfullName(int id) {
+ public String findFullName(int id) {
if (idToName == null) {
buildIdToName();
}
@@ -272,8 +272,7 @@ public class InternalSchema implements Serializable {
public String toString() {
return String.format("table {\n%s\n}",
StringUtils.join(record.fields().stream()
- .map(f -> " " + f)
- .collect(Collectors.toList()).toArray(new String[0]), "\n"));
+ .map(f -> " " + f).toArray(String[]::new), "\n"));
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
index 17a53d8139d..9ed55a7e573 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
@@ -116,9 +116,9 @@ public class InternalSchemaMerger {
Type newType = newTypes.get(i);
Types.Field oldField = oldFields.get(i);
int fieldId = oldField.fieldId();
- String fullName = querySchema.findfullName(fieldId);
+ String fullName = querySchema.findFullName(fieldId);
if (fileSchema.findField(fieldId) != null) {
- if (fileSchema.findfullName(fieldId).equals(fullName)) {
+ if (fileSchema.findFullName(fieldId).equals(fullName)) {
// maybe col type changed, deal with it.
newFields.add(Types.Field.get(oldField.fieldId(),
oldField.isOptional(), oldField.name(), newType, oldField.doc()));
} else {
@@ -173,7 +173,7 @@ public class InternalSchemaMerger {
}
String parentName = sb.toString();
int parentFieldIdFromQuerySchema = querySchema.findIdByName(parentName);
- String parentNameFromFileSchema =
fileSchema.findfullName(parentFieldIdFromQuerySchema);
+ String parentNameFromFileSchema =
fileSchema.findFullName(parentFieldIdFromQuerySchema);
if (parentNameFromFileSchema.isEmpty()) {
break;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index cf66986e155..94e72ff7180 100644
---
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -184,13 +184,13 @@ public class InternalSchemaUtils {
// the read file does not contain current col, so current colFilter is
invalid
return "";
} else {
- if (name.equals(fileSchema.findfullName(nameId))) {
+ if (name.equals(fileSchema.findFullName(nameId))) {
// no change happened on current col
return name;
} else {
// find rename operation on current col
// return the name from fileSchema
- return fileSchema.findfullName(nameId);
+ return fileSchema.findFullName(nameId);
}
}
}
@@ -210,8 +210,8 @@ public class InternalSchemaUtils {
Map<Integer, Pair<Type, Type>> result = new HashMap<>();
ids.stream().filter(f -> otherIds.contains(f)).forEach(f -> {
if (!schema.findType(f).equals(oldSchema.findType(f))) {
- String[] fieldNameParts = schema.findfullName(f).split("\\.");
- String[] otherFieldNameParts = oldSchema.findfullName(f).split("\\.");
+ String[] fieldNameParts = schema.findFullName(f).split("\\.");
+ String[] otherFieldNameParts = oldSchema.findFullName(f).split("\\.");
String parentName = fieldNameParts[0];
String otherParentName = otherFieldNameParts[0];
if (fieldNameParts.length == otherFieldNameParts.length &&
schema.findIdByName(parentName) == oldSchema.findIdByName(otherParentName)) {
@@ -280,8 +280,8 @@ public class InternalSchemaUtils {
return colNamesFromWriteSchema.stream().filter(f -> {
int fieldIdFromWriteSchema = oldSchema.findIdByName(f);
// try to find the cols which has the same id, but have different
colName;
- return newSchema.getAllIds().contains(fieldIdFromWriteSchema) &&
!newSchema.findfullName(fieldIdFromWriteSchema).equalsIgnoreCase(f);
- }).collect(Collectors.toMap(e ->
newSchema.findfullName(oldSchema.findIdByName(e)), e -> {
+ return newSchema.getAllIds().contains(fieldIdFromWriteSchema) &&
!newSchema.findFullName(fieldIdFromWriteSchema).equalsIgnoreCase(f);
+ }).collect(Collectors.toMap(e ->
newSchema.findFullName(oldSchema.findIdByName(e)), e -> {
int lastDotIndex = e.lastIndexOf(".");
return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1);
}));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
index 5a311c239dc..5240179fb8c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
+++
b/hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
@@ -22,11 +22,12 @@ import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
/**
* Tests {@link InternalSchemaMerger}.
*/
@@ -34,11 +35,11 @@ public class TestMergeSchema {
@Test
public void testPrimitiveMerge() {
- Types.RecordType record = Types.RecordType.get(Arrays.asList(new
Types.Field[] {
+ Types.RecordType record = Types.RecordType.get(Arrays.asList(
Types.Field.get(0, "col1", Types.BooleanType.get()),
Types.Field.get(1, "col2", Types.IntType.get()),
Types.Field.get(2, "col3", Types.LongType.get()),
- Types.Field.get(3, "col4", Types.FloatType.get())}));
+ Types.Field.get(3, "col4", Types.FloatType.get())));
InternalSchema oldSchema = new InternalSchema(record);
// add c1 after 'col1', and c2 before 'col3'
@@ -52,6 +53,7 @@ public class TestMergeSchema {
deleteChange.deleteColumn("col1");
deleteChange.deleteColumn("col3");
InternalSchema newDeleteSchema =
SchemaChangeUtils.applyTableChanges2Schema(newAddSchema, deleteChange);
+ assertEquals(newAddSchema.getMaxColumnId(),
newDeleteSchema.getMaxColumnId());
TableChanges.ColumnUpdateChange updateChange =
TableChanges.ColumnUpdateChange.get(newDeleteSchema);
updateChange.updateColumnType("col2", Types.LongType.get())
@@ -67,25 +69,23 @@ public class TestMergeSchema {
// merge schema by using columnType from query schema
InternalSchema mergeSchema = new InternalSchemaMerger(oldSchema,
finalSchema, true, false).mergeSchema();
- InternalSchema checkedSchema = new
InternalSchema(Types.RecordType.get(Arrays.asList(new Types.Field[] {
- Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1
after col1"),
- Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before
col3"),
- Types.Field.get(3, true, "col4", Types.FloatType.get()),
- Types.Field.get(1, true, "col2", Types.LongType.get(), "alter col2
comments"),
- Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(),
"add new col1")
- })));
- Assertions.assertEquals(mergeSchema, checkedSchema);
+ InternalSchema checkedSchema = new
InternalSchema(Types.RecordType.get(Arrays.asList(
+ Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1 after
col1"),
+ Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before
col3"),
+ Types.Field.get(3, true, "col4", Types.FloatType.get()),
+ Types.Field.get(1, true, "col2", Types.LongType.get(), "alter col2
comments"),
+ Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(), "add
new col1"))));
+ assertEquals(mergeSchema, checkedSchema);
// merge schema by using columnType from file schema
InternalSchema mergeSchema1 = new InternalSchemaMerger(oldSchema,
finalSchema, true, true).mergeSchema();
- InternalSchema checkedSchema1 = new
InternalSchema(Types.RecordType.get(Arrays.asList(new Types.Field[] {
- Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1
after col1"),
- Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before
col3"),
- Types.Field.get(3, true, "col4", Types.FloatType.get()),
- Types.Field.get(1, true, "col2", Types.IntType.get(), "alter col2
comments"),
- Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(),
"add new col1")
- })));
- Assertions.assertEquals(mergeSchema1, checkedSchema1);
+ InternalSchema checkedSchema1 = new
InternalSchema(Types.RecordType.get(Arrays.asList(
+ Types.Field.get(4, true, "c1", Types.BooleanType.get(), "add c1 after
col1"),
+ Types.Field.get(5, true, "c2", Types.IntType.get(), "add c2 before
col3"),
+ Types.Field.get(3, true, "col4", Types.FloatType.get()),
+ Types.Field.get(1, true, "col2", Types.IntType.get(), "alter col2
comments"),
+ Types.Field.get(6, true, "col1suffix", Types.BooleanType.get(), "add
new col1"))));
+ assertEquals(mergeSchema1, checkedSchema1);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 77df8d08418..137efba2861 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.QuickstartUtils.{DataGenerator,
convertToStringList, getQ
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
RawTripTestPayload}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.DataSourceTestUtils
@@ -436,22 +437,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
checkAnswer(createTestResult(tableName))(
Seq(1, "jack", "haha", 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
)
+ var maxColumnId = getMaxColumnId(tablePath)
// drop column newprice
-
spark.sql(s"alter table ${tableName} drop column newprice")
checkAnswer(createTestResult(tableName))(
Seq(1, "jack", "haha", 1000), Seq(2, "jack","exx1", 1000)
)
+ validateInternalSchema(tablePath, isDropColumn = true,
currentMaxColumnId = maxColumnId)
+ maxColumnId = getMaxColumnId(tablePath)
// add newprice back
spark.sql(s"alter table ${tableName} add columns(newprice string
comment 'add newprice back' after ext1)")
checkAnswer(createTestResult(tableName))(
Seq(1, "jack", "haha", null, 1000), Seq(2, "jack","exx1", null,
1000)
)
+ validateInternalSchema(tablePath, isDropColumn = false,
currentMaxColumnId = maxColumnId)
}
}
})
}
+ private def validateInternalSchema(basePath: String, isDropColumn: Boolean,
currentMaxColumnId: Int): Unit = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build()
+ val schema = new
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata.get()
+ val lastInstant =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
+ assert(schema.schemaId() == lastInstant.getTimestamp.toLong)
+ if (isDropColumn) {
+ assert(schema.getMaxColumnId == currentMaxColumnId)
+ } else {
+ assert(schema.getMaxColumnId == currentMaxColumnId + 1)
+ }
+ }
+
+ private def getMaxColumnId(basePath: String): Int = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build()
+ new
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata.get.getMaxColumnId
+ }
+
test("Test alter column nullability") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
diff --git
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index 779bde3326d..ecb230a515f 100644
---
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -102,14 +102,17 @@ case class AlterTableCommand(table: CatalogTable,
changes: Seq[TableChange], cha
SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
}
- def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema:
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
+ private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema:
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
deleteChanges.foreach { c =>
val originalColName = c.fieldNames().mkString(".")
checkSchemaChange(Seq(originalColName), table)
deleteChange.deleteColumn(originalColName)
}
- SchemaChangeUtils.applyTableChanges2Schema(oldSchema,
deleteChange).setSchemaId(oldSchema.getMaxColumnId)
+ val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema,
deleteChange)
+ // delete action should not change the getMaxColumnId field
+ newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
+ newSchema
}
@@ -128,8 +131,6 @@ case class AlterTableCommand(table: CatalogTable, changes:
Seq[TableChange], cha
def applyDeleteAction(sparkSession: SparkSession): Unit = {
val (oldSchema, historySchema) =
getInternalSchemaAndHistorySchemaStr(sparkSession)
val newSchema = applyDeleteAction2Schema(sparkSession, oldSchema,
changes.map(_.asInstanceOf[DeleteColumn]))
- // delete action should not change the getMaxColumnId field.
- newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
val verifiedHistorySchema = if (historySchema == null ||
historySchema.isEmpty) {
SerDeHelper.inheritSchemas(oldSchema, "")
} else {