This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a13ce10535 [HUDI-8468] use payload to do merging inside the 
expression payload  (#12726)
5a13ce10535 is described below

commit 5a13ce1053524d79c84b3004199b65e679545f8e
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Tue Jan 28 21:00:17 2025 -0800

    [HUDI-8468] use payload to do merging inside the expression payload  
(#12726)
---
 .../apache/hudi/common/util/HoodieRecordUtils.java |   5 +
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   5 +
 .../hudi/command/payload/ExpressionPayload.scala   |  52 ++++--
 .../spark/sql/hudi/dml/TestMergeIntoTable2.scala   | 174 ++++++++++++++++++++-
 .../hudi/dml/TestMergeModeEventTimeOrdering.scala  |   3 +-
 5 files changed, 228 insertions(+), 11 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 601aceceb76..c50a0c29769 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.OperationModeAwareness;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 
+import org.apache.avro.generic.GenericRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -114,6 +115,10 @@ public class HoodieRecordUtils {
     }
   }
 
+  public static <T extends HoodieRecordPayload> T loadPayload(String 
recordPayloadClass, GenericRecord record, Comparable orderingValue) {
+    return HoodieRecordUtils.loadPayload(recordPayloadClass, new Object[] 
{record, orderingValue}, GenericRecord.class, Comparable.class);
+  }
+
   public static boolean recordTypeCompatibleEngine(HoodieRecordType 
recordType, EngineType engineType) {
     return engineType == EngineType.SPARK && recordType == 
HoodieRecordType.SPARK;
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f05e60f8eb7..f90fbef8e82 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -474,6 +474,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       PAYLOAD_EXPECTED_COMBINED_SCHEMA -> 
encodeAsBase64String(toStructType(joinedExpectedOutput))
     )
 
+    // Append original payload class
+    writeParams ++= Seq(
+      PAYLOAD_ORIGINAL_AVRO_PAYLOAD -> 
hoodieCatalogTable.tableConfig.getPayloadClass
+    )
+
     val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDF)
     if (!success) {
       throw new HoodieException("Merge into Hoodie table command failed")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index b251813ddc2..de5a219c83e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -23,12 +23,12 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.hudi.avro.AvroSchemaUtils.{isNullable, resolveNullableSchema}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodiePayloadProps, HoodieRecord}
-import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils, 
ValidationUtils, Option => HOption}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodiePayloadProps, HoodieRecord, HoodieRecordPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, 
HoodieRecordUtils, StringUtils, ValidationUtils, Option => HOption}
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
-
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
 import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
@@ -44,7 +44,6 @@ import org.apache.spark.sql.types.{BooleanType, StructType}
 import java.nio.ByteBuffer
 import java.util.{Base64, Objects, Properties}
 import java.util.function.{Function, Supplier}
-
 import scala.collection.JavaConverters._
 
 /**
@@ -132,12 +131,10 @@ class ExpressionPayload(@transient record: GenericRecord,
           .serialize(resultingRow)
           .asInstanceOf[GenericRecord]
 
-        if (targetRecord.isEmpty || 
needUpdatingPersistedRecord(targetRecord.get, resultingAvroRecord, properties)) 
{
-          resultRecordOpt = HOption.of(resultingAvroRecord)
+        resultRecordOpt = if (targetRecord.isEmpty) {
+          HOption.of(resultingAvroRecord)
         } else {
-          // if the PreCombine field value of targetRecord is greater
-          // than the new incoming record, just keep the old record value.
-          resultRecordOpt = HOption.of(targetRecord.get)
+          doRecordMerge(resultingAvroRecord, targetRecord.get, writerSchema, 
properties)
         }
       }
     }
@@ -164,6 +161,38 @@ class ExpressionPayload(@transient record: GenericRecord,
     }
   }
 
+  private def doRecordMerge(incomingRecord: GenericRecord,
+                            existingRecord: IndexedRecord,
+                            schema: Schema,
+                            properties: Properties): HOption[IndexedRecord] = {
+    val originalPayload = properties.getProperty(PAYLOAD_ORIGINAL_AVRO_PAYLOAD)
+    if 
(originalPayload.equals(classOf[OverwriteWithLatestAvroPayload].getName)) {
+      // If is overwrite payload, then always pick the incoming record.
+      HOption.of(incomingRecord)
+    } else if 
(originalPayload.equals(classOf[DefaultHoodieRecordPayload].getName)) {
+      // If is default payload, then pick based on comparison result.
+      if (needUpdatingPersistedRecord(existingRecord, incomingRecord, 
properties)) {
+        HOption.of(incomingRecord)
+      } else {
+        HOption.of(existingRecord)
+      }
+    } else {
+      // For customized payload, create the payload class and merge.
+      val orderingField = ConfigUtils.getOrderingField(properties)
+      if (StringUtils.isNullOrEmpty(orderingField)) {
+        HOption.of(incomingRecord)
+      } else {
+        val consistentLogicalTimestampEnabled = properties.getProperty(
+          
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key,
+          
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue).toBoolean
+        val incomingRecordPayload = 
HoodieRecordUtils.loadPayload(originalPayload, incomingRecord,
+          HoodieAvroUtils.getNestedFieldVal(incomingRecord, orderingField, 
true, consistentLogicalTimestampEnabled)
+            .asInstanceOf[Comparable[_]]).asInstanceOf[HoodieRecordPayload[_ 
<: HoodieRecordPayload[_]]]
+        incomingRecordPayload.combineAndGetUpdateValue(existingRecord, schema, 
properties)
+      }
+    }
+  }
+
   /**
    * Holding wrapper record providing for lazy conversion into Catalyst's 
[[InternalRow]] from Avro
    *
@@ -314,6 +343,11 @@ object ExpressionPayload {
    */
   val PAYLOAD_RECORD_AVRO_SCHEMA = "hoodie.payload.record.schema"
 
+  /**
+   * Original record payload
+   */
+  val PAYLOAD_ORIGINAL_AVRO_PAYLOAD = "hoodie.payload.original.avro.payload"
+
   /**
    * Property associated w/ expected combined schema of the joined records of 
the source (incoming batch)
    * and target (existing) tables
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index 86169cb97a1..1e504fecd0c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.hudi.dml
 
-import org.apache.hudi.AutoRecordKeyGenerationUtils.getClass
 import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.hudi.common.config.RecordMergeMode
 import 
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 
@@ -1216,4 +1216,176 @@ class TestMergeIntoTable2 extends 
HoodieSparkSqlTestBase {
       }
     })
   }
+
+  test("Test MergeInto with commit time/event time ordering coverage") {
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        Seq(RecordMergeMode.COMMIT_TIME_ORDERING.name(),
+          RecordMergeMode.EVENT_TIME_ORDERING.name()).foreach { 
recordMergeMode =>
+          val sourceTable = generateTableName
+          spark.sql(
+            s"""
+               |CREATE TABLE $sourceTable (
+               |    id INT,
+               |    name STRING,
+               |    price INT,
+               |    ts BIGINT
+               |) USING hudi
+               | tblproperties (
+               |  type = '$tableType'
+               | )
+               |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
+               |""".stripMargin)
+
+          spark.sql(
+            s"""
+               | INSERT INTO $sourceTable
+               | VALUES (1, 'John Doe', 19, 1),
+               |        (4, 'Alice Johnson', 49, 2)
+               |""".stripMargin)
+
+          val targetTable = generateTableName
+          spark.sql(
+            s"""
+               |create table $targetTable (
+               |  id INT,
+               |  name STRING,
+               |  price INT,
+               |  ts BIGINT
+               |) using hudi
+               |TBLPROPERTIES (
+               |  type = 'cow',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  recordMergeMode = '$recordMergeMode'
+               | )
+               |LOCATION '${tmp.getCanonicalPath}/$targetTable'
+               |""".stripMargin)
+
+          spark.sql(
+            s"""
+               |INSERT INTO $targetTable
+               |SELECT id, name, price, ts
+               |FROM (
+               |    SELECT 1 as id, 'John Doe' as name, 19 as price, 
1598886001 as ts
+               |     UNION ALL
+               |     SELECT 2, 'Jane Doe', 24, 1598972400
+               |     UNION ALL
+               |     SELECT 3, 'Bob Smith', 14, 1599058800
+               |)
+               |""".stripMargin)
+
+          spark.sql(
+            s"""
+               |MERGE INTO $targetTable t
+               |USING $sourceTable s
+               |ON t.price = s.price
+               |WHEN MATCHED THEN UPDATE SET
+               |    t.id = s.id,
+               |    t.name = s.name,
+               |    t.price = s.price,
+               |    t.ts = s.ts
+               |WHEN NOT MATCHED THEN INSERT
+               |    (id, name, price, ts)
+               |VALUES
+               |    (s.id, s.name, s.price, s.ts)
+               |""".stripMargin)
+
+          checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY 
id")(
+            Seq(1, "John Doe", 19, if (recordMergeMode == 
RecordMergeMode.EVENT_TIME_ORDERING.name()) 1598886001L else 1L),
+            Seq(2, "Jane Doe", 24, 1598972400L),
+            Seq(3, "Bob Smith", 14, 1599058800L),
+            Seq(4, "Alice Johnson", 49, 2L))
+        }
+      }
+    }
+  }
+
+  test("Test MergeInto with CUSTOM merge mode using FirstValueAvroPayload") {
+    withRecordType()(withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val sourceTable = generateTableName
+        spark.sql(
+          s"""
+            |CREATE TABLE $sourceTable (
+            |    id INT,
+            |    name STRING,
+            |    price INT,
+            |    ts BIGINT
+            |) USING hudi
+            | tblproperties (
+            |  type = '$tableType'
+            | )
+            |LOCATION '${tmp.getCanonicalPath}/$sourceTable'
+            |""".stripMargin)
+
+        // Insert source data with same ts=1598886001 for id=1
+        spark.sql(
+          s"""
+            | INSERT INTO $sourceTable
+            | VALUES (1, 'John Doe Updated', 19, 1598886001),
+            |        (2, 'Jane Doe Updated', 24, 1598972401),
+            |        (4, 'Alice Johnson', 49, 2)
+            |""".stripMargin)
+
+        val targetTable = generateTableName
+        spark.sql(
+          s"""
+            |create table $targetTable (
+            |  id INT,
+            |  name STRING,
+            |  price INT,
+            |  ts BIGINT
+            |) using hudi
+            |TBLPROPERTIES (
+            |  type = 'cow',
+            |  primaryKey = 'id',
+            |  preCombineField = 'ts',
+            |  recordMergeMode = '${RecordMergeMode.CUSTOM.name()}',
+            |  
'hoodie.datasource.write.payload.class'='org.apache.hudi.common.model.FirstValueAvroPayload',
+            |  hoodie.datasource.write.recordkey.field = 'id'
+            | )
+            |LOCATION '${tmp.getCanonicalPath}/$targetTable'
+            |""".stripMargin)
+
+        spark.sql(
+          s"""
+            |INSERT INTO $targetTable
+            |SELECT id, name, price, ts
+            |FROM (
+            |    SELECT 1 as id, 'John Doe Initial' as name, 19 as price, 
1598886001 as ts
+            |     UNION ALL
+            |     SELECT 2, 'Jane Doe', 24, 1598972400
+            |     UNION ALL
+            |     SELECT 3, 'Bob Smith', 14, 1599058800
+            |)
+            |""".stripMargin)
+
+        spark.sql(
+          s"""
+            |MERGE INTO $targetTable t
+            |USING $sourceTable s
+            |ON t.price = s.price
+            |WHEN MATCHED THEN UPDATE SET
+            |    t.id = s.id,
+            |    t.name = s.name,
+            |    t.price = s.price,
+            |    t.ts = s.ts
+            |WHEN NOT MATCHED THEN INSERT
+            |    (id, name, price, ts)
+            |VALUES
+            |    (s.id, s.name, s.price, s.ts)
+            |""".stripMargin)
+
+        // Verify FirstValueAvroPayload behavior:
+        // - For id=1: keeps first value ("John Doe Initial") since timestamps 
are equal
+        // - For id=4: inserts new record normally
+        checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY 
id")(
+          Seq(1, "John Doe Initial", 19, 1598886001L), // 
FirstValueAvroPayload keeps first record
+          Seq(2, "Jane Doe Updated", 24, 1598972401L),
+          Seq(3, "Bob Smith", 14, 1599058800L),
+          Seq(4, "Alice Johnson", 49, 2L))
+      }
+    })
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
index f284c24b258..f0ffbc3a38a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeEventTimeOrdering.scala
@@ -173,7 +173,8 @@ class TestMergeModeEventTimeOrdering extends 
HoodieSparkSqlTestBase {
   }
 
   Seq("mor").foreach { tableType =>
-    // [HUDI-8915]: COW MIT delete does not honor event time ordering.
+    // [HUDI-8915]: COW MIT delete does not honor event time ordering. For 
update we have the coverage in
+    // "Test MergeInto with commit time/event time ordering coverage".
     //  Seq("cow", "mor").foreach { tableType =>
     test(s"Test merge operations with EVENT_TIME_ORDERING for $tableType 
table") {
       
withSparkSqlSessionConfig("hoodie.merge.small.file.group.candidates.limit" -> 
"0") {

Reply via email to