This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f2c1b4d9e8f5d77a5bc67bcfb1bfa30a204ef46f Author: Vova Kolmakov <[email protected]> AuthorDate: Tue May 14 16:42:59 2024 -0700 [HUDI-6854] Change default payload type to HOODIE_AVRO_DEFAULT (#10949) --- .../main/java/org/apache/hudi/config/HoodiePayloadConfig.java | 4 ++-- .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 4 ++-- .../org/apache/hudi/common/model/DefaultHoodieRecordPayload.java | 4 +++- .../apache/hudi/common/model/OverwriteWithLatestAvroPayload.java | 2 -- .../java/org/apache/hudi/common/table/HoodieTableConfig.java | 4 ++-- .../scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala | 8 ++++---- .../org/apache/hudi/functional/TestHiveTableSchemaEvolution.java | 3 ++- .../org/apache/hudi/functional/TestBasicSchemaEvolution.scala | 9 ++++++--- .../apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala | 4 ++-- .../test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 7 ++++++- 10 files changed, 29 insertions(+), 20 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java index 3929dcba047..5c70000bd6c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import java.io.File; import java.io.FileReader; @@ -50,7 +50,7 @@ public class HoodiePayloadConfig extends HoodieConfig { public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.compaction.payload.class") - .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) + .defaultValue(DefaultHoodieRecordPayload.class.getName()) .markAdvanced() .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses " + "the record payload class to merge records in the log against each other, merge again with the base file and " diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3220ef22c2f..558aba5b17b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -35,13 +35,13 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FileSystemRetryConfig; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroRecordMerger; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.block.HoodieLogBlock; @@ -148,7 +148,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.datasource.write.payload.class") - .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) + .defaultValue(DefaultHoodieRecordPayload.class.getName()) .markAdvanced() .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. " + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective"); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index daa1dcb0207..a3e6ce1f133 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -37,9 +37,11 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; /** + * Default payload. * {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue. * <p> - * 1. preCombine - Picks the latest delta record for a key, based on an ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record based on ordering field value. + * 1. preCombine - Picks the latest delta record for a key, based on an ordering field + * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record based on ordering field value. */ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { public static final String METADATA_EVENT_TIME_KEY = "metadata.event_time.key"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index d9fbd4cba05..dac9b828896 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -30,8 +30,6 @@ import java.io.IOException; import java.util.Objects; /** - * Default payload. - * * <ol> * <li> preCombine - Picks the latest delta record for a key, based on an ordering field; * <li> combineAndGetUpdateValue/getInsertValue - Simply overwrites storage with latest delta record diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f0674da2c6c..16539ac1a32 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -24,12 +24,12 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.OrderedProperties; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTimelineTimeZone; -import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; @@ -163,7 +163,7 @@ public class HoodieTableConfig extends HoodieConfig { public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty .key("hoodie.compaction.payload.class") - .defaultValue(OverwriteWithLatestAvroPayload.class.getName()) + .defaultValue(DefaultHoodieRecordPayload.class.getName()) .withDocumentation("Payload class to use for performing compactions, i.e merge delta logs with current base file and then " + " produce a new base file."); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 02a6a151dea..782c1a2bc06 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -22,7 +22,7 @@ import org.apache.hudi.{DataSourceWriteOptions, HoodieFileIndex} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.toProperties import org.apache.hudi.common.config.{DFSPropertiesConfiguration, TypedProperties} -import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload, WriteOperationType} +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, HoodieWriteConfig} @@ -44,8 +44,8 @@ import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyP import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE import org.apache.spark.sql.types.StructType - import java.util.Locale + import scala.collection.JavaConverters._ trait ProvidesHoodieConfig extends Logging { @@ -102,7 +102,7 @@ trait ProvidesHoodieConfig extends Logging { // Validate duplicate key for inserts to COW table when using strict insert mode. classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { - classOf[OverwriteWithLatestAvroPayload].getCanonicalName + classOf[DefaultHoodieRecordPayload].getCanonicalName } } @@ -276,7 +276,7 @@ trait ProvidesHoodieConfig extends Logging { if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) { classOf[ValidateDuplicateKeyPayload].getCanonicalName } else { - classOf[OverwriteWithLatestAvroPayload].getCanonicalName + classOf[DefaultHoodieRecordPayload].getCanonicalName } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index dff9d2e9ccc..a5a45cabf81 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -97,7 +97,8 @@ public class TestHiveTableSchemaEvolution { spark.sql("set hoodie.schema.on.read.enable=true"); spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi " - + "tblproperties (type='%s', primaryKey='col0', preCombineField='col1') location '%s'", + + "tblproperties (type='%s', primaryKey='col0', preCombineField='col1', " + + "hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload') location '%s'", tableName, tableType, path)); spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName)); spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName)); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala index dfb69da29c0..6e7615b54c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala @@ -17,9 +17,8 @@ package org.apache.hudi.functional -import org.apache.hadoop.fs.FileSystem import org.apache.hudi.HoodieConversionUtils.toJavaOption -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util import org.apache.hudi.config.HoodieWriteConfig @@ -28,15 +27,18 @@ import org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectCo import org.apache.hudi.testutils.HoodieSparkClientTestBase import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport} + +import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.hudi.HoodieSparkSessionExtension import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions, functions} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource import java.util.function.Consumer + import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConverters._ @@ -49,6 +51,7 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser "hoodie.bulkinsert.shuffle.parallelism" -> "2", "hoodie.delete.shuffle.parallelism" -> "1", HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true", + HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> classOf[OverwriteWithLatestAvroPayload].getName, DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala index 31e5f96d5d8..2a7de760230 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala @@ -35,13 +35,13 @@ class TestHoodieOptionConfig extends SparkClientFunctionalTestHarness { assertTrue(with1.size == 4) assertTrue(with1("primaryKey") == "id") assertTrue(with1("type") == "cow") - assertTrue(with1("payloadClass") == classOf[OverwriteWithLatestAvroPayload].getName) + assertTrue(with1("payloadClass") == classOf[DefaultHoodieRecordPayload].getName) assertTrue(with1("recordMergerStrategy") == HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID) val ops2 = Map("primaryKey" -> "id", "preCombineField" -> "timestamp", "type" -> "mor", - "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName, + "payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName, "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID ) val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala index 9f23494ae79..5e43d714a5e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala @@ -715,6 +715,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val dataGen = new QuickstartUtils.DataGenerator val inserts = QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) + .withColumn("ts", lit("20240404000000")) // to make test determinate for HOODIE_AVRO_DEFAULT payload df.write.format("hudi"). options(QuickstartUtils.getQuickstartWriteConfigs). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). @@ -733,6 +734,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val dfUpdate = spark.read.json(spark.sparkContext.parallelize(updates, 2)) .withColumn("fare", expr("cast(fare as string)")) .withColumn("addColumn", lit("new")) + .withColumn("ts", lit("20240404000005")) // to make test determinate for HOODIE_AVRO_DEFAULT payload dfUpdate.drop("begin_lat").write.format("hudi"). options(QuickstartUtils.getQuickstartWriteConfigs). option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType). @@ -763,6 +765,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val dfOverWrite = spark. read.json(spark.sparkContext.parallelize(overwrite, 2)). filter("partitionpath = 'americas/united_states/san_francisco'") + .withColumn("ts", lit("20240404000010")) // to make test determinate for HOODIE_AVRO_DEFAULT payload .withColumn("fare", expr("cast(fare as string)")) // fare now in table is string type, we forbid convert string to double. dfOverWrite.write.format("hudi"). options(QuickstartUtils.getQuickstartWriteConfigs). @@ -779,7 +782,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.read.format("hudi").load(tablePath).show(false) val updatesAgain = QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) - val dfAgain = spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)).withColumn("fare", expr("cast(fare as string)")) + val dfAgain = spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)). + withColumn("fare", expr("cast(fare as string)")). + withColumn("ts", lit("20240404000015")) // to make test determinate for HOODIE_AVRO_DEFAULT payload dfAgain.write.format("hudi"). options(QuickstartUtils.getQuickstartWriteConfigs). option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
