This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 421f1cd6b75b3a9599edd0953273f606ac112e3d Author: xiarixiaoyao <[email protected]> AuthorDate: Wed Dec 7 10:38:54 2022 +0800 [HUDI-5294] Support type change for schema on read + reconcile schema (#7326) * [HUDI-5294] Support type change for schema on read + reconcile schema --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- .../hudi/common/util/InternalSchemaCache.java | 26 ++++- .../schema/utils/AvroSchemaEvolutionUtils.java | 33 ++++--- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 32 +++++-- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 106 ++++++++++++++++++++- 5 files changed, 172 insertions(+), 27 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 609f85e27fe..4cec5d1b0c3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -293,7 +293,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, InternalSchema internalSchema; Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { - internalSchema = AvroInternalSchemaConverter.convert(avroSchema); + internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema)); internalSchema.setSchemaId(Long.parseLong(instantTime)); } else { internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java index 846309b7b67..5f5a8763409 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.util; +import org.apache.avro.Schema; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -26,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.internal.schema.utils.SerDeHelper; @@ -166,9 +169,11 @@ public class InternalSchemaCache { * step1: * try to parser internalSchema from HoodieInstant directly * step2: - * if we cannot parser internalSchema in step1, + * if we cannot parser internalSchema in step1, (eg: current versionId HoodieInstant has been archived) * try to find internalSchema in historySchema. - * + * step3: + * if we cannot parser internalSchema in step2 (eg: schema evolution is not enabled when we create hoodie table, however after some inserts we enable schema evolution) + * try to convert table schema to internalSchema. * @param versionId the internalSchema version to be search. * @param tablePath table path * @param hadoopConf conf @@ -176,6 +181,7 @@ public class InternalSchemaCache { * @return a internalSchema. */ public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, Configuration hadoopConf, String validCommits) { + String avroSchema = ""; Set<String> commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet()); List<String> validateCommitList = commitSet.stream().map(fileName -> { String fileExtension = HoodieInstant.getTimelineFileExtension(fileName); @@ -199,6 +205,7 @@ public class InternalSchemaCache { } HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class); String latestInternalSchemaStr = metadata.getMetadata(SerDeHelper.LATEST_SCHEMA); + avroSchema = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); if (latestInternalSchemaStr != null) { return SerDeHelper.fromJson(latestInternalSchemaStr).orElse(null); } @@ -209,8 +216,19 @@ public class InternalSchemaCache { } // step2: FileBasedInternalSchemaStorageManager fileBasedInternalSchemaStorageManager = new FileBasedInternalSchemaStorageManager(hadoopConf, new Path(tablePath)); - String lastestHistorySchema = fileBasedInternalSchemaStorageManager.getHistorySchemaStrByGivenValidCommits(validateCommitList); - return InternalSchemaUtils.searchSchema(versionId, SerDeHelper.parseSchemas(lastestHistorySchema)); + String latestHistorySchema = fileBasedInternalSchemaStorageManager.getHistorySchemaStrByGivenValidCommits(validateCommitList); + if (latestHistorySchema.isEmpty()) { + return InternalSchema.getEmptyInternalSchema(); + } + InternalSchema fileSchema = InternalSchemaUtils.searchSchema(versionId, SerDeHelper.parseSchemas(latestHistorySchema)); + // step3: + return fileSchema.isEmptySchema() ? AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(avroSchema))) : fileSchema; + } + + public static InternalSchema getInternalSchemaByVersionId(long versionId, HoodieTableMetaClient metaClient) { + String validCommitLists = metaClient + .getCommitsAndCompactionTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getFileName).collect(Collectors.joining(",")); + return getInternalSchemaByVersionId(versionId, metaClient.getBasePathV2().toString(), metaClient.getHadoopConf(), validCommitLists); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java index e2b33915853..2561fc507e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java @@ -19,13 +19,11 @@ package org.apache.hudi.internal.schema.utils; import org.apache.hudi.internal.schema.InternalSchema; -import org.apache.hudi.internal.schema.Types; import org.apache.hudi.internal.schema.action.TableChanges; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.avro.Schema; -import java.util.ArrayList; import java.util.List; import java.util.TreeMap; import java.util.stream.Collectors; @@ -41,7 +39,8 @@ public class AvroSchemaEvolutionUtils { * 2) incoming data contains new columns not defined yet in the table -> columns will be added to the table schema (incoming dataframe?) * 3) incoming data has missing columns that are already defined in the table and new columns not yet defined in the table -> * new columns will be added to the table schema, missing columns will be injected with null values - * 4) support nested schema change. + * 4) support type change + * 5) support nested schema change. * Notice: * the incoming schema should not have delete/rename semantics. * for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d @@ -52,25 +51,30 @@ public class AvroSchemaEvolutionUtils { */ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema) { InternalSchema inComingInternalSchema = AvroInternalSchemaConverter.convert(incomingSchema); - // do check, only support add column evolution + // check column add/missing List<String> colNamesFromIncoming = inComingInternalSchema.getAllColsFullName(); List<String> colNamesFromOldSchema = oldTableSchema.getAllColsFullName(); List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromIncoming.contains(f)).collect(Collectors.toList()); - List<Types.Field> newFields = new ArrayList<>(); - if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) { + List<String> diffFromEvolutionColumns = colNamesFromIncoming.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList()); + // check type change. + List<String> typeChangeColumns = colNamesFromIncoming + .stream() + .filter(f -> colNamesFromOldSchema.contains(f) && !inComingInternalSchema.findType(f).equals(oldTableSchema.findType(f))) + .collect(Collectors.toList()); + if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0 && typeChangeColumns.isEmpty()) { return oldTableSchema; } - List<String> diffFromEvolutionSchema = colNamesFromIncoming.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList()); + // Remove redundancy from diffFromEvolutionSchema. // for example, now we add a struct col in evolvedSchema, the struct col is " user struct<name:string, age:int> " // when we do diff operation: user, user.name, user.age will appeared in the resultSet which is redundancy, user.name and user.age should be excluded. // deal with add operation TreeMap<Integer, String> finalAddAction = new TreeMap<>(); - for (int i = 0; i < diffFromEvolutionSchema.size(); i++) { - String name = diffFromEvolutionSchema.get(i); + for (int i = 0; i < diffFromEvolutionColumns.size(); i++) { + String name = diffFromEvolutionColumns.get(i); int splitPoint = name.lastIndexOf("."); String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : ""; - if (!parentName.isEmpty() && diffFromEvolutionSchema.contains(parentName)) { + if (!parentName.isEmpty() && diffFromEvolutionColumns.contains(parentName)) { // find redundancy, skip it continue; } @@ -94,7 +98,14 @@ public class AvroSchemaEvolutionUtils { inferPosition.map(i -> addChange.addPositionChange(name, i, "before")); }); - return SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange); + // do type evolution. + InternalSchema internalSchemaAfterAddColumns = SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange); + TableChanges.ColumnUpdateChange typeChange = TableChanges.ColumnUpdateChange.get(internalSchemaAfterAddColumns); + typeChangeColumns.stream().filter(f -> !inComingInternalSchema.findType(f).isNestedType()).forEach(col -> { + typeChange.updateColumnType(col, inComingInternalSchema.findType(col)); + }); + + return SchemaChangeUtils.applyTableChanges2Schema(internalSchemaAfterAddColumns, typeChange); } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 0122576a679..9442195c608 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -258,8 +258,10 @@ object HoodieSparkSqlWriter { if (reconcileSchema) { // In case we need to reconcile the schema and schema evolution is enabled, // we will force-apply schema evolution to the writer's schema - if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) { - internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema)) + if (schemaEvolutionEnabled) { + // in case sourceSchema contains HoodieRecord.HOODIE_META_COLUMNS + val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean + AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchema, allowOperationMetaDataField)) } if (internalSchemaOpt.isDefined) { @@ -314,7 +316,7 @@ object HoodieSparkSqlWriter { // Create a HoodieWriteClient & issue the write. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path, - tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) + tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt, Some(writerDataSchema)) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) { @@ -399,10 +401,27 @@ object HoodieSparkSqlWriter { processedRecord } - def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = { + def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema], writeSchemaOpt: Option[Schema] = None): Map[String, String] = { val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false" - parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)), - HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable) + + val schemaValidateEnable = if (schemaEvolutionEnable.toBoolean && parameters.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), "false").toBoolean) { + // force disable schema validate, now we support schema evolution, no need to do validate + "false" + } else { + parameters.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), "true") + } + // correct internalSchema, internalSchema should contain hoodie metadata columns. + val correctInternalSchema = internalSchemaOpt.map { internalSchema => + if (internalSchema.findField(HoodieRecord.RECORD_KEY_METADATA_FIELD) == null && writeSchemaOpt.isDefined) { + val allowOperationMetaDataField = parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), "false").toBoolean + AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(writeSchemaOpt.get, allowOperationMetaDataField)) + } else { + internalSchema + } + } + parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(correctInternalSchema.getOrElse(null)), + HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable, + HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key() -> schemaValidateEnable) } /** @@ -411,7 +430,6 @@ object HoodieSparkSqlWriter { * @param fs instance of FileSystem. * @param basePath base path. * @param sparkContext instance of spark context. - * @param schema incoming record's schema. * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. */ def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 9d955cb8310..ed9db3a5aa4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -18,13 +18,16 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.fs.Path +import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, TABLE_NAME} +import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.functions.{arrays_zip, col} +import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit} +import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.JavaConversions._ @@ -511,7 +514,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } - test("Test schema auto evolution") { + test("Test schema auto evolution complex") { withTempDir { tmp => Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => val tableName = generateTableName @@ -534,7 +537,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", "hoodie.schema.on.read.enable" -> "true", - "hoodie.datasource.write.reconcile.schema" -> "true", DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true" ) @@ -546,7 +548,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { .save(tablePath) val oldView = spark.read.format("hudi").load(tablePath) - oldView.show(false) + oldView.show(5, false) val records2 = RawTripTestPayload.recordsToStrings(dataGen.generateUpdatesAsPerSchema("002", 100, schema)).toList val inputD2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) @@ -571,4 +573,100 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + + test("Test schema auto evolution") { + withTempDir { tmp => + Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => + // for complex schema. + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + if (HoodieSparkUtils.gteqSpark3_1) { + val dataGen = new DataGenerator + val inserts = convertToStringList(dataGen.generateInserts(10)) + val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) + df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option("hoodie.schema.on.read.enable","true"). + option(TABLE_NAME.key(), tableName). + option("hoodie.table.name", tableName). + mode("overwrite"). + save(tablePath) + + val updates = convertToStringList(dataGen.generateUpdates(10)) + // type change: fare (double -> String) + // add new column and drop a column + val dfUpdate = spark.read.json(spark.sparkContext.parallelize(updates, 2)) + .withColumn("fare", expr("cast(fare as string)")) + .withColumn("addColumn", lit("new")) + dfUpdate.drop("begin_lat").write.format("hudi"). + options(getQuickstartWriteConfigs). + option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option("hoodie.schema.on.read.enable","true"). + option("hoodie.datasource.write.reconcile.schema","true"). + option(TABLE_NAME.key(), tableName). + option("hoodie.table.name", tableName). + mode("append"). + save(tablePath) + spark.sql("set hoodie.schema.on.read.enable=true") + + val snapshotDF = spark.read.format("hudi").load(tablePath) + + assertResult(StringType)(snapshotDF.schema.fields.filter(_.name == "fare").head.dataType) + assertResult("addColumn")(snapshotDF.schema.fields.last.name) + val checkRowKey = dfUpdate.select("fare").collectAsList().map(_.getString(0)).get(0) + snapshotDF.createOrReplaceTempView("hudi_trips_snapshot") + checkAnswer(spark.sql(s"select fare, addColumn from hudi_trips_snapshot where fare = ${checkRowKey}").collect())( + Seq(checkRowKey, "new") + ) + + spark.sql(s"select * from hudi_trips_snapshot").show(false) + // test insert_over_write + update again + val overwrite = convertToStringList(dataGen.generateInserts(10)) + val dfOverWrite = spark. + read.json(spark.sparkContext.parallelize(overwrite, 2)). + filter("partitionpath = 'americas/united_states/san_francisco'") + .withColumn("fare", expr("cast(fare as string)")) // fare now in table is string type, we forbid convert string to double. + dfOverWrite.write.format("hudi"). + options(getQuickstartWriteConfigs). + option("hoodie.datasource.write.operation","insert_overwrite"). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option("hoodie.schema.on.read.enable","true"). + option("hoodie.datasource.write.reconcile.schema","true"). + option(TABLE_NAME.key(), tableName). + option("hoodie.table.name", tableName). + mode("append"). + save(tablePath) + spark.read.format("hudi").load(tablePath).show(false) + + val updatesAgain = convertToStringList(dataGen.generateUpdates(10)) + val dfAgain = spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)).withColumn("fare", expr("cast(fare as string)")) + dfAgain.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option("hoodie.schema.on.read.enable","true"). + option("hoodie.datasource.write.reconcile.schema","true"). + option(TABLE_NAME.key(), tableName). + option("hoodie.table.name", tableName). + mode("append"). + save(tablePath) + spark.read.format("hudi").load(tablePath).createOrReplaceTempView("hudi_trips_snapshot1") + val checkKey = dfAgain.select("fare").collectAsList().map(_.getString(0)).get(0) + checkAnswer(spark.sql(s"select fare, addColumn from hudi_trips_snapshot1 where fare = ${checkKey}").collect())( + Seq(checkKey, null) + ) + } + } + } + } }
