This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5a13ce10535 [HUDI-8468] use payload to do merging inside the
expression payload (#12726)
5a13ce10535 is described below
commit 5a13ce1053524d79c84b3004199b65e679545f8e
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Tue Jan 28 21:00:17 2025 -0800
[HUDI-8468] use payload to do merging inside the expression payload
(#12726)
---
.../apache/hudi/common/util/HoodieRecordUtils.java | 5 +
.../hudi/command/MergeIntoHoodieTableCommand.scala | 5 +
.../hudi/command/payload/ExpressionPayload.scala | 52 ++++--
.../spark/sql/hudi/dml/TestMergeIntoTable2.scala | 174 ++++++++++++++++++++-
.../hudi/dml/TestMergeModeEventTimeOrdering.scala | 3 +-
5 files changed, 228 insertions(+), 11 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 601aceceb76..c50a0c29769 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.OperationModeAwareness;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +115,10 @@ public class HoodieRecordUtils {
}
}
+ public static <T extends HoodieRecordPayload> T loadPayload(String
recordPayloadClass, GenericRecord record, Comparable orderingValue) {
+ return HoodieRecordUtils.loadPayload(recordPayloadClass, new Object[]
{record, orderingValue}, GenericRecord.class, Comparable.class);
+ }
+
public static boolean recordTypeCompatibleEngine(HoodieRecordType
recordType, EngineType engineType) {
return engineType == EngineType.SPARK && recordType ==
HoodieRecordType.SPARK;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f05e60f8eb7..f90fbef8e82 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -474,6 +474,11 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
PAYLOAD_EXPECTED_COMBINED_SCHEMA ->
encodeAsBase64String(toStructType(joinedExpectedOutput))
)
+ // Append original payload class
+ writeParams ++= Seq(
+ PAYLOAD_ORIGINAL_AVRO_PAYLOAD ->
hoodieCatalogTable.tableConfig.getPayloadClass
+ )
+
val (success, _, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append,
writeParams, sourceDF)
if (!success) {
throw new HoodieException("Merge into Hoodie table command failed")
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index b251813ddc2..de5a219c83e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -23,12 +23,12 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils.{isNullable, resolveNullableSchema}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodiePayloadProps, HoodieRecord}
-import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils,
ValidationUtils, Option => HOption}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodiePayloadProps, HoodieRecord, HoodieRecordPayload,
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils,
HoodieRecordUtils, StringUtils, ValidationUtils, Option => HOption}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
-
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
@@ -44,7 +44,6 @@ import org.apache.spark.sql.types.{BooleanType, StructType}
import java.nio.ByteBuffer
import java.util.{Base64, Objects, Properties}
import java.util.function.{Function, Supplier}
-
import scala.collection.JavaConverters._
/**
@@ -132,12 +131,10 @@ class ExpressionPayload(@transient record: GenericRecord,
.serialize(resultingRow)
.asInstanceOf[GenericRecord]
- if (targetRecord.isEmpty ||
needUpdatingPersistedRecord(targetRecord.get, resultingAvroRecord, properties))
{
- resultRecordOpt = HOption.of(resultingAvroRecord)
+ resultRecordOpt = if (targetRecord.isEmpty) {
+ HOption.of(resultingAvroRecord)
} else {
- // if the PreCombine field value of targetRecord is greater
- // than the new incoming record, just keep the old record value.
- resultRecordOpt = HOption.of(targetRecord.get)
+ doRecordMerge(resultingAvroRecord, targetRecord.get, writerSchema,
properties)
}
}
}
@@ -164,6 +161,38 @@ class ExpressionPayload(@transient record: GenericRecord,
}
}
+ private def doRecordMerge(incomingRecord: GenericRecord,
+ existingRecord: IndexedRecord,
+ schema: Schema,
+ properties: Properties): HOption[IndexedRecord] = {
+ val originalPayload = properties.getProperty(PAYLOAD_ORIGINAL_AVRO_PAYLOAD)
+ if
(originalPayload.equals(classOf[OverwriteWithLatestAvroPayload].getName)) {
+ // If is overwrite payload, then always pick the incoming record.
+ HOption.of(incomingRecord)
+ } else if
(originalPayload.equals(classOf[DefaultHoodieRecordPayload].getName)) {
+ // If is default payload, then pick based on comparison result.
+ if (needUpdatingPersistedRecord(existingRecord, incomingRecord,
properties)) {
+ HOption.of(incomingRecord)
+ } else {
+ HOption.of(existingRecord)
+ }
+ } else {
+ // For customized payload, create the payload class and merge.
+ val orderingField = ConfigUtils.getOrderingField(properties)
+ if (StringUtils.isNullOrEmpty(orderingField)) {
+ HOption.of(incomingRecord)
+ } else {
+ val consistentLogicalTimestampEnabled = properties.getProperty(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key,
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue).toBoolean
+ val incomingRecordPayload =
HoodieRecordUtils.loadPayload(originalPayload, incomingRecord,
+ HoodieAvroUtils.getNestedFieldVal(incomingRecord, orderingField,
true, consistentLogicalTimestampEnabled)
+ .asInstanceOf[Comparable[_]]).asInstanceOf[HoodieRecordPayload[_
<: HoodieRecordPayload[_]]]
+ incomingRecordPayload.combineAndGetUpdateValue(existingRecord, schema,
properties)
+ }
+ }
+ }
+
/**
* Holding wrapper record providing for lazy conversion into Catalyst's
[[InternalRow]] from Avro
*
@@ -314,6 +343,11 @@ object ExpressionPayload {
*/
val PAYLOAD_RECORD_AVRO_SCHEMA = "hoodie.payload.record.schema"
+ /**
+ * Original record payload
+ */
+ val PAYLOAD_ORIGINAL_AVRO_PAYLOAD = "hoodie.payload.original.avro.payload"
+
/**
* Property associated w/ expected combined schema of the joined records of
the source (incoming batch)
* and target (existing) tables
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index 86169cb97a1..1e504fecd0c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.hudi.dml
-import org.apache.hudi.AutoRecordKeyGenerationUtils.getClass
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.hudi.common.config.RecordMergeMode
import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
@@ -1216,4 +1216,176 @@ class TestMergeIntoTable2 extends
HoodieSparkSqlTestBase {
}
})
}
+
+ test("Test MergeInto with commit time/event time ordering coverage") {
+ Seq("cow", "mor").foreach { tableType =>
+ withTempDir { tmp =>
+ Seq(RecordMergeMode.COMMIT_TIME_ORDERING.name(),
+ RecordMergeMode.EVENT_TIME_ORDERING.name()).foreach {
recordMergeMode =>
+ val sourceTable = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $sourceTable (
+ | id INT,
+ | name STRING,
+ | price INT,
+ | ts BIGINT
+ |) USING hudi
+ | tblproperties (
+ | type = '$tableType'
+ | )
+ |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ | INSERT INTO $sourceTable
+ | VALUES (1, 'John Doe', 19, 1),
+ | (4, 'Alice Johnson', 49, 2)
+ |""".stripMargin)
+
+ val targetTable = generateTableName
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | id INT,
+ | name STRING,
+ | price INT,
+ | ts BIGINT
+ |) using hudi
+ |TBLPROPERTIES (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | recordMergeMode = '$recordMergeMode'
+ | )
+ |LOCATION '${tmp.getCanonicalPath}/$targetTable'
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |INSERT INTO $targetTable
+ |SELECT id, name, price, ts
+ |FROM (
+ | SELECT 1 as id, 'John Doe' as name, 19 as price,
1598886001 as ts
+ | UNION ALL
+ | SELECT 2, 'Jane Doe', 24, 1598972400
+ | UNION ALL
+ | SELECT 3, 'Bob Smith', 14, 1599058800
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |MERGE INTO $targetTable t
+ |USING $sourceTable s
+ |ON t.price = s.price
+ |WHEN MATCHED THEN UPDATE SET
+ | t.id = s.id,
+ | t.name = s.name,
+ | t.price = s.price,
+ | t.ts = s.ts
+ |WHEN NOT MATCHED THEN INSERT
+ | (id, name, price, ts)
+ |VALUES
+ | (s.id, s.name, s.price, s.ts)
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY
id")(
+ Seq(1, "John Doe", 19, if (recordMergeMode ==
RecordMergeMode.EVENT_TIME_ORDERING.name()) 1598886001L else 1L),
+ Seq(2, "Jane Doe", 24, 1598972400L),
+ Seq(3, "Bob Smith", 14, 1599058800L),
+ Seq(4, "Alice Johnson", 49, 2L))
+ }
+ }
+ }
+ }
+
+ test("Test MergeInto with CUSTOM merge mode using FirstValueAvroPayload") {
+ withRecordType()(withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val sourceTable = generateTableName
+ spark.sql(
+ s"""
+ |CREATE TABLE $sourceTable (
+ | id INT,
+ | name STRING,
+ | price INT,
+ | ts BIGINT
+ |) USING hudi
+ | tblproperties (
+ | type = '$tableType'
+ | )
+ |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
+ |""".stripMargin)
+
+ // Insert source data with same ts=1598886001 for id=1
+ spark.sql(
+ s"""
+ | INSERT INTO $sourceTable
+ | VALUES (1, 'John Doe Updated', 19, 1598886001),
+ | (2, 'Jane Doe Updated', 24, 1598972401),
+ | (4, 'Alice Johnson', 49, 2)
+ |""".stripMargin)
+
+ val targetTable = generateTableName
+ spark.sql(
+ s"""
+ |create table $targetTable (
+ | id INT,
+ | name STRING,
+ | price INT,
+ | ts BIGINT
+ |) using hudi
+ |TBLPROPERTIES (
+ | type = 'cow',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | recordMergeMode = '${RecordMergeMode.CUSTOM.name()}',
+ |
'hoodie.datasource.write.payload.class'='org.apache.hudi.common.model.FirstValueAvroPayload',
+ | hoodie.datasource.write.recordkey.field = 'id'
+ | )
+ |LOCATION '${tmp.getCanonicalPath}/$targetTable'
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |INSERT INTO $targetTable
+ |SELECT id, name, price, ts
+ |FROM (
+ | SELECT 1 as id, 'John Doe Initial' as name, 19 as price,
1598886001 as ts
+ | UNION ALL
+ | SELECT 2, 'Jane Doe', 24, 1598972400
+ | UNION ALL
+ | SELECT 3, 'Bob Smith', 14, 1599058800
+ |)
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |MERGE INTO $targetTable t
+ |USING $sourceTable s
+ |ON t.price = s.price
+ |WHEN MATCHED THEN UPDATE SET
+ | t.id = s.id,
+ | t.name = s.name,
+ | t.price = s.price,
+ | t.ts = s.ts
+ |WHEN NOT MATCHED THEN INSERT
+ | (id, name, price, ts)
+ |VALUES
+ | (s.id, s.name, s.price, s.ts)
+ |""".stripMargin)
+
+ // Verify FirstValueAvroPayload behavior:
+ // - For id=1: keeps first value ("John Doe Initial") since timestamps
are equal
+ // - For id=4: inserts new record normally
+ checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY
id")(
+ Seq(1, "John Doe Initial", 19, 1598886001L), //
FirstValueAvroPayload keeps first record
+ Seq(2, "Jane Doe Updated", 24, 1598972401L),
+ Seq(3, "Bob Smith", 14, 1599058800L),
+ Seq(4, "Alice Johnson", 49, 2L))
+ }
+ })
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
index f284c24b258..f0ffbc3a38a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -173,7 +173,8 @@ class TestMergeModeEventTimeOrdering extends
HoodieSparkSqlTestBase {
}
Seq("mor").foreach { tableType =>
- // [HUDI-8915]: COW MIT delete does not honor event time ordering.
+ // [HUDI-8915]: COW MIT delete does not honor event time ordering. For
update we have the coverage in
+ // "Test MergeInto with commit time/event time ordering coverage".
// Seq("cow", "mor").foreach { tableType =>
test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType
table") {
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" ->
"0") {