This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 2b81e6bf96e [HUDI-7771] Making OverwriteWithLatestPayload as default
payload in 0.15.0 (#11240)
2b81e6bf96e is described below
commit 2b81e6bf96e7fd58353d3966d95a1562dc084e20
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 21:54:55 2024 -0700
[HUDI-7771] Making OverwriteWithLatestPayload as default payload in 0.15.0
(#11240)
---
.../src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java | 4 ++--
.../src/main/java/org/apache/hudi/config/HoodieWriteConfig.java | 4 ++--
.../org/apache/hudi/common/model/DefaultHoodieRecordPayload.java | 4 +---
.../apache/hudi/common/model/OverwriteWithLatestAvroPayload.java | 2 ++
.../main/java/org/apache/hudi/common/table/HoodieTableConfig.java | 4 ++--
.../scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala | 6 +++---
.../org/apache/hudi/functional/TestHiveTableSchemaEvolution.java | 3 +--
.../org/apache/hudi/functional/TestBasicSchemaEvolution.scala | 7 ++-----
.../org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala | 4 ++--
.../test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 8 ++------
10 files changed, 19 insertions(+), 27 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 5c70000bd6c..3929dcba047 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import java.io.File;
import java.io.FileReader;
@@ -50,7 +50,7 @@ public class HoodiePayloadConfig extends HoodieConfig {
public static final ConfigProperty<String> PAYLOAD_CLASS_NAME =
ConfigProperty
.key("hoodie.compaction.payload.class")
- .defaultValue(DefaultHoodieRecordPayload.class.getName())
+ .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.markAdvanced()
.withDocumentation("This needs to be same as class used during
insert/upserts. Just like writing, compaction also uses "
+ "the record payload class to merge records in the log against each
other, merge again with the base file and "
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c4b5be318ba..6e83af2f203 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -35,13 +35,13 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -142,7 +142,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS_NAME =
ConfigProperty
.key("hoodie.datasource.write.payload.class")
- .defaultValue(DefaultHoodieRecordPayload.class.getName())
+ .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.markAdvanced()
.withDocumentation("Payload class used. Override this, if you like to
roll your own merge logic, when upserting/inserting. "
+ "This will render any value set for PRECOMBINE_FIELD_OPT_VAL
in-effective");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index a3e6ce1f133..daa1dcb0207 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -37,11 +37,9 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * Default payload.
* {@link HoodieRecordPayload} impl that honors ordering field in both
preCombine and combineAndGetUpdateValue.
* <p>
- * 1. preCombine - Picks the latest delta record for a key, based on an
ordering field
- * 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest record
based on ordering field value.
+ * 1. preCombine - Picks the latest delta record for a key, based on an
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest
record based on ordering field value.
*/
public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload
{
public static final String METADATA_EVENT_TIME_KEY =
"metadata.event_time.key";
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index dac9b828896..d9fbd4cba05 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.util.Objects;
/**
+ * Default payload.
+ *
* <ol>
* <li> preCombine - Picks the latest delta record for a key, based on an
ordering field;
* <li> combineAndGetUpdateValue/getInsertValue - Simply overwrites storage
with latest delta record
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f6dcdce1c34..d09348f1ce7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -26,12 +26,12 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.OrderedProperties;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -166,7 +166,7 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> PAYLOAD_CLASS_NAME =
ConfigProperty
.key("hoodie.compaction.payload.class")
- .defaultValue(DefaultHoodieRecordPayload.class.getName())
+ .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
.withDocumentation("Payload class to use for performing compactions, i.e
merge delta logs with current base file and then "
+ " produce a new base file.");
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 44c6911f7d6..e1bae0dbf3c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -21,7 +21,7 @@ import
org.apache.hudi.AutoRecordKeyGenerationUtils.shouldAutoGenerateRecordKeys
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
TypedProperties}
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
WriteOperationType}
+import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload,
WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{ReflectionUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -105,7 +105,7 @@ trait ProvidesHoodieConfig extends Logging {
// Validate duplicate key for inserts to COW table when using strict
insert mode.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
- classOf[DefaultHoodieRecordPayload].getCanonicalName
+ classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
}
@@ -279,7 +279,7 @@ trait ProvidesHoodieConfig extends Logging {
if (insertDupPolicy == FAIL_INSERT_DUP_POLICY) {
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
- classOf[DefaultHoodieRecordPayload].getCanonicalName
+ classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index a5a45cabf81..dff9d2e9ccc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -97,8 +97,7 @@ public class TestHiveTableSchemaEvolution {
spark.sql("set hoodie.schema.on.read.enable=true");
spark.sql(String.format("create table %s (col0 int, col1 float, col2
string) using hudi "
- + "tblproperties (type='%s', primaryKey='col0',
preCombineField='col1', "
- +
"hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload')
location '%s'",
+ + "tblproperties (type='%s', primaryKey='col0',
preCombineField='col1') location '%s'",
tableName, tableType, path));
spark.sql(String.format("insert into %s values(1, 1.1, 'text')",
tableName));
spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1",
tableName));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index 32d9d4aa614..8e177aa7479 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -17,6 +17,7 @@
package org.apache.hudi.functional
+import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType,
OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
@@ -27,18 +28,15 @@ import
org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectCo
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
ScalaAssertionSupport}
-
-import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructField, StructType}
import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession,
SparkSessionExtensions, functions}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.util.function.Consumer
-
import scala.collection.JavaConverters._
class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with
ScalaAssertionSupport {
@@ -50,7 +48,6 @@ class TestBasicSchemaEvolution extends
HoodieSparkClientTestBase with ScalaAsser
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key() -> "true",
- HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
index 2a7de760230..31e5f96d5d8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieOptionConfig.scala
@@ -35,13 +35,13 @@ class TestHoodieOptionConfig extends
SparkClientFunctionalTestHarness {
assertTrue(with1.size == 4)
assertTrue(with1("primaryKey") == "id")
assertTrue(with1("type") == "cow")
- assertTrue(with1("payloadClass") ==
classOf[DefaultHoodieRecordPayload].getName)
+ assertTrue(with1("payloadClass") ==
classOf[OverwriteWithLatestAvroPayload].getName)
assertTrue(with1("recordMergerStrategy") ==
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
val ops2 = Map("primaryKey" -> "id",
"preCombineField" -> "timestamp",
"type" -> "mor",
- "payloadClass" -> classOf[OverwriteWithLatestAvroPayload].getName,
+ "payloadClass" -> classOf[DefaultHoodieRecordPayload].getName,
"recordMergerStrategy" -> HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
)
val with2 = HoodieOptionConfig.withDefaultSqlOptions(ops2)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index e5b4beb97d1..e0f4b0a83e5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -714,7 +714,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val dataGen = new QuickstartUtils.DataGenerator
val inserts =
QuickstartUtils.convertToStringList(dataGen.generateInserts(10)).asScala.toSeq
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
- .withColumn("ts", lit("20240404000000")) // to make test
determinate for HOODIE_AVRO_DEFAULT payload
df.write.format("hudi").
options(QuickstartUtils.getQuickstartWriteConfigs).
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
@@ -733,7 +732,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val dfUpdate =
spark.read.json(spark.sparkContext.parallelize(updates, 2))
.withColumn("fare", expr("cast(fare as string)"))
.withColumn("addColumn", lit("new"))
- .withColumn("ts", lit("20240404000005")) // to make test
determinate for HOODIE_AVRO_DEFAULT payload
dfUpdate.drop("begin_lat").write.format("hudi").
options(QuickstartUtils.getQuickstartWriteConfigs).
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
@@ -764,7 +762,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val dfOverWrite = spark.
read.json(spark.sparkContext.parallelize(overwrite, 2)).
filter("partitionpath = 'americas/united_states/san_francisco'")
- .withColumn("ts", lit("20240404000010")) // to make test
determinate for HOODIE_AVRO_DEFAULT payload
.withColumn("fare", expr("cast(fare as string)")) // fare now in
table is string type, we forbid convert string to double.
dfOverWrite.write.format("hudi").
options(QuickstartUtils.getQuickstartWriteConfigs).
@@ -781,9 +778,8 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.read.format("hudi").load(tablePath).show(false)
val updatesAgain =
QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)).asScala.toSeq
- val dfAgain =
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2)).
- withColumn("fare", expr("cast(fare as string)")).
- withColumn("ts", lit("20240404000015")) // to make test
determinate for HOODIE_AVRO_DEFAULT payload
+ val dfAgain =
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 2))
+ .withColumn("fare", expr("cast(fare as string)"))
dfAgain.write.format("hudi").
options(QuickstartUtils.getQuickstartWriteConfigs).
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").