This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 2b81e6bf96e [HUDI-7771] Making OverwriteWithLatestPayload as default 
payload in 0.15.0 (#11240)
2b81e6bf96e is described below

commit 2b81e6bf96e7fd58353d3966d95a1562dc084e20
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 21:54:55 2024 -0700

    [HUDI-7771] Making OverwriteWithLatestPayload as default payload in 0.15.0 
(#11240)
---
 .../src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java | 4 ++--
 .../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java   | 4 ++--
 .../org/apache/hudi/common/model/DefaultHoodieRecordPayload.java  | 4 +---
 .../apache/hudi/common/model/OverwriteWithLatestAvroPayload.java  | 2 ++
 .../main/java/org/apache/hudi/common/table/HoodieTableConfig.java | 4 ++--
 .../scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala    | 6 +++---
 .../org/apache/hudi/functional/TestHiveTableSchemaEvolution.java  | 3 +--
 .../org/apache/hudi/functional/TestBasicSchemaEvolution.scala     | 7 ++-----
 .../org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala | 4 ++--
 .../test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  | 8 ++------
 10 files changed, 19 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 5c70000bd6c..3929dcba047 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 
 import java.io.File;
 import java.io.FileReader;
@@ -50,7 +50,7 @@ public class HoodiePayloadConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.compaction.payload.class")
-      .defaultValue(DefaultHoodieRecordPayload.class.getName())
+      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
       .markAdvanced()
       .withDocumentation("This needs to be same as class used during 
insert/upserts. Just like writing, compaction also uses "
         + "the record payload class to merge records in the log against each 
other, merge again with the base file and "
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c4b5be318ba..6e83af2f203 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -35,13 +35,13 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FileSystemRetryConfig;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -142,7 +142,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.datasource.write.payload.class")
-      .defaultValue(DefaultHoodieRecordPayload.class.getName())
+      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
       .markAdvanced()
       .withDocumentation("Payload class used. Override this, if you like to 
roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL 
in-effective");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index a3e6ce1f133..daa1dcb0207 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -37,11 +37,9 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * Default payload.
  * {@link HoodieRecordPayload} impl that honors ordering field in both 
preCombine and combineAndGetUpdateValue.
  * <p>
- * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field
- * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record 
based on ordering field value.
+ * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest 
record based on ordering field value.
  */
 public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload 
{
   public static final String METADATA_EVENT_TIME_KEY = 
"metadata.event_time.key";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index dac9b828896..d9fbd4cba05 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 /**
+ * Default payload.
+ *
  * <ol>
  * <li> preCombine - Picks the latest delta record for a key, based on an 
ordering field;
  * <li> combineAndGetUpdateValue/getInsertValue - Simply overwrites storage 
with latest delta record
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f6dcdce1c34..d09348f1ce7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -26,12 +26,12 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.OrderedProperties;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -166,7 +166,7 @@ public class HoodieTableConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = 
ConfigProperty
       .key("hoodie.compaction.payload.class")
-      .defaultValue(DefaultHoodieRecordPayload.class.getName())
+      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
       .withDocumentation("Payload class to use for performing compactions, i.e 
merge delta logs with current base file and then "
           + " produce a new base file.");
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 44c6911f7d6..e1bae0dbf3c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -21,7 +21,7 @@ import 
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
TypedProperties}
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
WriteOperationType}
+import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload, 
WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -105,7 +105,7 @@ trait ProvidesHoodieConfig extends Logging {
       // Validate duplicate key for inserts to COW table when using strict 
insert mode.
       classOf[ValidateDuplicateKeyPayload].getCanonicalName
     } else {
-      classOf[DefaultHoodieRecordPayload].getCanonicalName
+      classOf[OverwriteWithLatestAvroPayload].getCanonicalName
     }
   }
 
@@ -279,7 +279,7 @@ trait ProvidesHoodieConfig extends Logging {
       if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) {
         classOf[ValidateDuplicateKeyPayload].getCanonicalName
       } else {
-        classOf[DefaultHoodieRecordPayload].getCanonicalName
+        classOf[OverwriteWithLatestAvroPayload].getCanonicalName
       }
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index a5a45cabf81..dff9d2e9ccc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -97,8 +97,7 @@ public class TestHiveTableSchemaEvolution {
 
       spark.sql("set hoodie.schema.on.read.enable=true");
       spark.sql(String.format("create table %s (col0 int, col1 float, col2 
string) using hudi "
-              + "tblproperties (type='%s', primaryKey='col0', 
preCombineField='col1', "
-              + 
"hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload')
 location '%s'",
+              + "tblproperties (type='%s', primaryKey='col0', 
preCombineField='col1') location '%s'",
           tableName, tableType, path));
       spark.sql(String.format("insert into %s values(1, 1.1, 'text')", 
tableName));
       spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", 
tableName));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index 32d9d4aa614..8e177aa7479 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.functional
 
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, 
OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
@@ -27,18 +28,15 @@ import 
org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectCo
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
ScalaAssertionSupport}
-
-import org.apache.hadoop.fs.FileSystem
 import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructField, StructType}
 import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, 
SparkSessionExtensions, functions}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
 import java.util.function.Consumer
-
 import scala.collection.JavaConverters._
 
 class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with 
ScalaAssertionSupport {
@@ -50,7 +48,6 @@ class TestBasicSchemaEvolution extends 
HoodieSparkClientTestBase with ScalaAsser
     "hoodie.bulkinsert.shuffle.parallelism" -> "2",
     "hoodie.delete.shuffle.parallelism" -> "1",
     HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
-    HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> 
classOf[OverwriteWithLatestAvroPayload].getName,
     DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
     DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
index 2a7de760230..31e5f96d5d8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
@@ -35,13 +35,13 @@ class TestHoodieOptionConfig extends 
SparkClientFunctionalTestHarness {
     assertTrue(with1.size == 4)
     assertTrue(with1("primaryKey") == "id")
     assertTrue(with1("type") == "cow")
-    assertTrue(with1("payloadClass") == 
classOf[DefaultHoodieRecordPayload].getName)
+    assertTrue(with1("payloadClass") == 
classOf[OverwriteWithLatestAvroPayload].getName)
     assertTrue(with1("recordMergerStrategy") == 
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
 
     val ops2 = Map("primaryKey" -> "id",
       "preCombineField" -> "timestamp",
       "type" -> "mor",
-      "payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName,
+      "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
       "recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
     )
     val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index e5b4beb97d1..e0f4b0a83e5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -714,7 +714,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val dataGen = new QuickstartUtils.DataGenerator
           val inserts = 
QuickstartUtils.convertToStringList(dataGen.generateInserts(10)).asScala.toSeq
           val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
-            .withColumn("ts", lit("20240404000000")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
           df.write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
@@ -733,7 +732,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val dfUpdate = 
spark.read.json(spark.sparkContext.parallelize(updates, 2))
             .withColumn("fare", expr("cast(fare as string)"))
             .withColumn("addColumn", lit("new"))
-            .withColumn("ts", lit("20240404000005")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
           dfUpdate.drop("begin_lat").write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
@@ -764,7 +762,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           val dfOverWrite = spark.
             read.json(spark.sparkContext.parallelize(overwrite, 2)).
             filter("partitionpath = 'americas/united_states/san_francisco'")
-            .withColumn("ts", lit("20240404000010")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
             .withColumn("fare", expr("cast(fare as string)")) // fare now in 
table is string type, we forbid convert string to double.
           dfOverWrite.write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
@@ -781,9 +778,8 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           spark.read.format("hudi").load(tablePath).show(false)
 
           val updatesAgain = 
QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)).asScala.toSeq
-          val dfAgain = 
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)).
-            withColumn("fare", expr("cast(fare as string)")).
-            withColumn("ts", lit("20240404000015")) // to make test 
determinate for HOODIE_AVRO_DEFAULT payload
+          val dfAgain = 
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2))
+            .withColumn("fare", expr("cast(fare as string)"))
           dfAgain.write.format("hudi").
             options(QuickstartUtils.getQuickstartWriteConfigs).
             option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").

Reply via email to