This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f2c1b4d9e8f5d77a5bc67bcfb1bfa30a204ef46f
Author: Vova Kolmakov <[email protected]>
AuthorDate: Tue May 14 16:42:59 2024 -0700

    [HUDI-6854] Change default payload type to HOODIE_AVRO_DEFAULT (#10949)
---
 .../main/java/org/apache/hudi/config/HoodiePayloadConfig.java    | 4 ++--
 .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java  | 4 ++--
 .../org/apache/hudi/common/model/DefaultHoodieRecordPayload.java | 4 +++-
 .../apache/hudi/common/model/OverwriteWithLatestAvroPayload.java | 2 --
 .../java/org/apache/hudi/common/table/HoodieTableConfig.java     | 4 ++--
 .../scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala   | 8 ++++----
 .../org/apache/hudi/functional/TestHiveTableSchemaEvolution.java | 3 ++-
 .../org/apache/hudi/functional/TestBasicSchemaEvolution.scala    | 9 ++++++---
 .../apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala    | 4 ++--
 .../test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 7 ++++++-
 10 files changed, 29 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 3929dcba047..5c70000bd6c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 
 import java.io.File;
 import java.io.FileReader;
@@ -50,7 +50,7 @@ public class HoodiePayloadConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.compaction.payload.class")
-      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .defaultValue(DefaultHoodieRecordPayload.class.getName())
       .markAdvanced()
       .withDocumentation("This needs to be same as class used during 
insert/upserts. Just like writing, compaction also uses "
         + "the record payload class to merge records in the log against each 
other, merge again with the base file and "
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3220ef22c2f..558aba5b17b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -35,13 +35,13 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FileSystemRetryConfig;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -148,7 +148,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.datasource.write.payload.class")
-      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .defaultValue(DefaultHoodieRecordPayload.class.getName())
       .markAdvanced()
       .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index daa1dcb0207..a3e6ce1f133 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -37,9 +37,11 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ * Default payload.
  * {@link HoodieRecordPayload} impl that honors ordering field in both 
preCombine and combineAndGetUpdateValue.
  * <p>
- * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest 
record based on ordering field value.
+ * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field
+ * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record 
based on ordering field value.
  */
 public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload 
{
   public static final String METADATA_EVENT_TIME_KEY = 
"metadata.event_time.key";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index d9fbd4cba05..dac9b828896 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -30,8 +30,6 @@ import java.io.IOException;
 import java.util.Objects;
 
 /**
- * Default payload.
- *
  * <ol>
  * <li> preCombine - Picks the latest delta record for a key, based on an 
ordering field;
  * <li> combineAndGetUpdateValue/getInsertValue - Simply overwrites storage 
with latest delta record
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f0674da2c6c..16539ac1a32 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -24,12 +24,12 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -163,7 +163,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.compaction.payload.class")
-      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .defaultValue(DefaultHoodieRecordPayload.class.getName())
       .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
           + " produce a new base file.");
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 02a6a151dea..782c1a2bc06 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.{DataSourceWriteOptions, 
HoodieFileIndex}
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
TypedProperties}
-import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload, 
WriteOperationType}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieInternalConfig, 
HoodieWriteConfig}
@@ -44,8 +44,8 @@ import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyP
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PARTITION_OVERWRITE_MODE
 import org.apache.spark.sql.types.StructType
-
 import java.util.Locale
+
 import scala.collection.JavaConverters._
 
 trait ProvidesHoodieConfig extends Logging {
@@ -102,7 +102,7 @@ trait ProvidesHoodieConfig extends Logging {
       // Validate duplicate key for inserts to COW table when using strict 
insert mode.
       classOf[ValidateDuplicateKeyPayload].getCanonicalName
     } else {
-      classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+      classOf[DefaultHoodieRecordPayload].getCanonicalName
     }
   }
 
@@ -276,7 +276,7 @@ trait ProvidesHoodieConfig extends Logging {
       if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) {
         classOf[ValidateDuplicateKeyPayload].getCanonicalName
       } else {
-        classOf[OverwriteWithLatestAvroPayload].getCanonicalName
+        classOf[DefaultHoodieRecordPayload].getCanonicalName
       }
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index dff9d2e9ccc..a5a45cabf81 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -97,7 +97,8 @@ public class TestHiveTableSchemaEvolution {
 
       spark.sql("set hoodie.schema.on.read.enable=true");
       spark.sql(String.format("create table %s (col0 int, col1 float, col2 
string) using hudi "
-              + "tblproperties (type='%s', primaryKey='col0', 
preCombineField='col1') location '%s'",
+              + "tblproperties (type='%s', primaryKey='col0', 
preCombineField='col1', "
+              + 
"hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload')
 location '%s'",
           tableName, tableType, path));
       spark.sql(String.format("insert into %s values(1, 1.1, 'text')", 
tableName));
       spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", 
tableName));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index dfb69da29c0..6e7615b54c0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -17,9 +17,8 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hadoop.fs.FileSystem
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
WriteOperationType}
+import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util
 import org.apache.hudi.config.HoodieWriteConfig
@@ -28,15 +27,18 @@ import 
org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectCo
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
ScalaAssertionSupport}
+
+import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
 import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, 
SparkSessionExtensions, functions}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
 import java.util.function.Consumer
+
 import scala.collection.JavaConversions.asScalaBuffer
 import scala.collection.JavaConverters._
 
@@ -49,6 +51,7 @@ class TestBasicSchemaEvolution extends 
HoodieSparkClientTestBase with ScalaAsser
     "hoodie.bulkinsert.shuffle.parallelism" -> "2",
     "hoodie.delete.shuffle.parallelism" -> "1",
     HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
+    HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
index 31e5f96d5d8..2a7de760230 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
@@ -35,13 +35,13 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
     assertTrue(with1.size == 4)
     assertTrue(with1("primaryKey") == "id")
     assertTrue(with1("type") == "cow")
-    assertTrue(with1("payloadClass") == 
classOf[OverwriteWithLatestAvroPayload].getName)
+    assertTrue(with1("payloadClass") == 
classOf[DefaultHoodieRecordPayload].getName)
     assertTrue(with1("recordMergerStrategy") == 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
 
     val ops2 = Map("primaryKey" -> "id",
       "preCombineField" -> "timestamp",
       "type" -> "mor",
-      "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
+      "payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName,
       "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
     )
     val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 9f23494ae79..5e43d714a5e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -715,6 +715,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val dataGen = new QuickstartUtils.DataGenerator
           val inserts = 
QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
           val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+            .withColumn("ts", lit("20240404000000")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
           df.write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
@@ -733,6 +734,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val dfUpdate = 
spark.read.json(spark.sparkContext.parallelize(updates, 2))
             .withColumn("fare", expr("cast(fare as string)"))
             .withColumn("addColumn", lit("new"))
+            .withColumn("ts", lit("20240404000005")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
           dfUpdate.drop("begin_lat").write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
@@ -763,6 +765,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val dfOverWrite = spark.
             read.json(spark.sparkContext.parallelize(overwrite, 2)).
             filter("partitionpath = 'americas/united_states/san_francisco'")
+            .withColumn("ts", lit("20240404000010")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
             .withColumn("fare", expr("cast(fare as string)")) // fare now in 
table is string type, we forbid convert string to double.
           dfOverWrite.write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
@@ -779,7 +782,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           spark.read.format("hudi").load(tablePath).show(false)
 
           val updatesAgain = 
QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
-          val dfAgain = 
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 
2)).withColumn("fare", expr("cast(fare as string)"))
+          val dfAgain = 
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)).
+            withColumn("fare", expr("cast(fare as string)")).
+            withColumn("ts", lit("20240404000015")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
           dfAgain.write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").

Reply via email to