This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b57f1eb8ab [HUDI-8835] Support partial update in COMMIT_TIME_ORDERING 
merge mode (#13263)
6b57f1eb8ab is described below

commit 6b57f1eb8abe862595b607d210003b239b2b0578
Author: Jon Vexler <[email protected]>
AuthorDate: Thu May 15 15:26:14 2025 -0400

    [HUDI-8835] Support partial update in COMMIT_TIME_ORDERING merge mode 
(#13263)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../org/apache/hudi/DefaultSparkRecordMerger.java  | 60 ++-----------
 .../org/apache/hudi/HoodieSparkRecordMerger.java   | 44 ++++++++++
 .../hudi/OverwriteWithLatestSparkRecordMerger.java | 12 +++
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   | 99 ++++++++++++++++++++--
 4 files changed, 152 insertions(+), 63 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
index 652b016cf50..7ac55ea81e4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
@@ -21,11 +21,9 @@ package org.apache.hudi;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.merge.SparkRecordMergingUtils;
 
@@ -45,34 +43,11 @@ public class DefaultSparkRecordMerger extends 
HoodieSparkRecordMerger {
 
   @Override
   public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-    ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
-    ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
-
-    if (newer instanceof HoodieSparkRecord) {
-      HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
-      if (newSparkRecord.isDelete(newSchema, props)) {
-        // Delete record
-        return Option.empty();
-      }
-    } else {
-      if (newer.getData() == null) {
-        // Delete record
-        return Option.empty();
-      }
+    Option<Pair<HoodieRecord, Schema>> deleteHandlingResult = 
handleDeletes(older, oldSchema, newer, newSchema, props);
+    if (deleteHandlingResult != null) {
+      return deleteHandlingResult;
     }
 
-    if (older instanceof HoodieSparkRecord) {
-      HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
-      if (oldSparkRecord.isDelete(oldSchema, props)) {
-        // use natural order for delete record
-        return Option.of(Pair.of(newer, newSchema));
-      }
-    } else {
-      if (older.getData() == null) {
-        // use natural order for delete record
-        return Option.of(Pair.of(newer, newSchema));
-      }
-    }
     if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
       return Option.of(Pair.of(older, oldSchema));
     } else {
@@ -82,34 +57,11 @@ public class DefaultSparkRecordMerger extends 
HoodieSparkRecordMerger {
 
   @Override
   public Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
-    ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecordType.SPARK);
-    ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecordType.SPARK);
-
-    if (newer instanceof HoodieSparkRecord) {
-      HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
-      if (newSparkRecord.isDelete(newSchema, props)) {
-        // Delete record
-        return Option.empty();
-      }
-    } else {
-      if (newer.getData() == null) {
-        // Delete record
-        return Option.empty();
-      }
+    Option<Pair<HoodieRecord, Schema>> deleteHandlingResult = 
handleDeletes(older, oldSchema, newer, newSchema, props);
+    if (deleteHandlingResult != null) {
+      return deleteHandlingResult;
     }
 
-    if (older instanceof HoodieSparkRecord) {
-      HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
-      if (oldSparkRecord.isDelete(oldSchema, props)) {
-        // use natural order for delete record
-        return Option.of(Pair.of(newer, newSchema));
-      }
-    } else {
-      if (older.getData() == null) {
-        // use natural order for delete record
-        return Option.of(Pair.of(newer, newSchema));
-      }
-    }
     if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
       return Option.of(SparkRecordMergingUtils.mergePartialRecords(
           (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, readerSchema, props));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 79e2708f9eb..77f696170f1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -19,12 +19,56 @@
 
 package org.apache.hudi;
 
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
 
 public abstract class HoodieSparkRecordMerger implements HoodieRecordMerger {
   @Override
   public HoodieRecord.HoodieRecordType getRecordType() {
     return HoodieRecord.HoodieRecordType.SPARK;
   }
+
+  /**
+   * Basic handling of deletes that is used by many of the spark mergers
+   * returns null if merger specific logic should be used
+   */
+  protected Option<Pair<HoodieRecord, Schema>> handleDeletes(HoodieRecord 
older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties 
props) {
+    ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.SPARK);
+    ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecord.HoodieRecordType.SPARK);
+
+    if (newer instanceof HoodieSparkRecord) {
+      HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+      if (newSparkRecord.isDelete(newSchema, props)) {
+        // Delete record
+        return Option.empty();
+      }
+    } else {
+      if (newer.getData() == null) {
+        // Delete record
+        return Option.empty();
+      }
+    }
+
+    if (older instanceof HoodieSparkRecord) {
+      HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
+      if (oldSparkRecord.isDelete(oldSchema, props)) {
+        // use natural order for delete record
+        return Option.of(Pair.of(newer, newSchema));
+      }
+    } else {
+      if (older.getData() == null) {
+        // use natural order for delete record
+        return Option.of(Pair.of(newer, newSchema));
+      }
+    }
+
+    return null;
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
index 403f392ef1a..95cc8909dc9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
@@ -21,8 +21,10 @@ package org.apache.hudi;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.merge.SparkRecordMergingUtils;
 
 import org.apache.avro.Schema;
 
@@ -42,4 +44,14 @@ public class OverwriteWithLatestSparkRecordMerger extends 
HoodieSparkRecordMerge
   public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
     return Option.of(Pair.of(newer, newSchema));
   }
+
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema, 
TypedProperties props) throws IOException {
+    Option<Pair<HoodieRecord, Schema>> deleteHandlingResult = 
handleDeletes(older, oldSchema, newer, newSchema, props);
+    if (deleteHandlingResult != null) {
+      return deleteHandlingResult;
+    }
+    return Option.of(SparkRecordMergingUtils.mergePartialRecords(
+        (HoodieSparkRecord) older, oldSchema, (HoodieSparkRecord) newer, 
newSchema, readerSchema, props));
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 7d508d5ba0a..867a032f7de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.dml
 
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig, 
RecordMergeMode}
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.hudi.common.table.log.HoodieLogFileReader
@@ -46,14 +46,26 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     testPartialUpdate("cow", "avro")
   }
 
+  test("Test partial update with COW and Avro log format and commit time 
ordering") {
+    testPartialUpdate("cow", "avro", commitTimeOrdering = true)
+  }
+
   test("Test partial update with MOR and Avro log format") {
     testPartialUpdate("mor", "avro")
   }
 
+  test("Test partial update with MOR and avro log format and commit time 
ordering") {
+    testPartialUpdate("mor", "avro", commitTimeOrdering = true)
+  }
+
   test("Test partial update with MOR and Parquet log format") {
     testPartialUpdate("mor", "parquet")
   }
 
+  test("Test partial update with MOR and Parquet log format and commit time 
ordering") {
+    testPartialUpdate("mor", "parquet", commitTimeOrdering = true)
+  }
+
   test("Test partial update and insert with COW and Avro log format") {
     testPartialUpdateWithInserts("cow", "avro")
   }
@@ -62,10 +74,18 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     testPartialUpdateWithInserts("mor", "avro")
   }
 
+  test("Test partial update and insert with MOR and Avro log format and commit 
time ordering") {
+    testPartialUpdateWithInserts("mor", "avro", commitTimeOrdering = true)
+  }
+
   test("Test partial update and insert with MOR and Parquet log format") {
     testPartialUpdateWithInserts("mor", "parquet")
   }
 
+  test("Test partial update and insert with MOR and Parquet log format and 
commit time ordering") {
+    testPartialUpdateWithInserts("mor", "parquet", commitTimeOrdering = true)
+  }
+
   test("Test partial update with schema on read enabled") {
     withSQLConf(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key() -> 
"true") {
       try {
@@ -257,6 +277,12 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
 
   def testPartialUpdate(tableType: String,
                         logDataBlockFormat: String): Unit = {
+    testPartialUpdate(tableType, logDataBlockFormat, commitTimeOrdering = 
false)
+  }
+
+  def testPartialUpdate(tableType: String,
+                        logDataBlockFormat: String,
+                        commitTimeOrdering: Boolean): Unit = {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = tmp.getCanonicalPath + "/" + tableName
@@ -264,7 +290,17 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
       spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
$logDataBlockFormat")
       spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
+      val mergeMode = if (commitTimeOrdering) {
+        RecordMergeMode.COMMIT_TIME_ORDERING.name()
+      } else {
+        RecordMergeMode.EVENT_TIME_ORDERING.name()
+      }
 
+      val preCombineString = if (commitTimeOrdering) {
+        ""
+      } else {
+        "preCombineField = '_ts',"
+      }
       // Create a table with five data fields
       spark.sql(
         s"""
@@ -278,7 +314,8 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            |tblproperties(
            | type ='$tableType',
            | primaryKey = 'id',
-           | preCombineField = '_ts'
+           | $preCombineString
+           | recordMergeMode = '$mergeMode'
            |)
            |location '$basePath'
         """.stripMargin)
@@ -296,21 +333,47 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(
         s"""
            |merge into $tableName t0
-           |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as ts
+           |using ( select 1 as id, 'a1' as name, 12.0 as price, 999 as ts
            |union select 3 as id, 'a3' as name, 25.0 as price, 1260 as ts) s0
            |on t0.id = s0.id
            |when matched then update set price = s0.price, _ts = s0.ts
            |""".stripMargin)
-
       validateTableSchema(tableName, structFields)
+      if (commitTimeOrdering) {
+        checkAnswer(s"select id, name, price, _ts, description from 
$tableName")(
+          Seq(1, "a1", 12.0, 999, "a1: desc1"),
+          Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+          Seq(3, "a3", 25.0, 1260, "a3: desc3")
+        )
+      } else {
+        checkAnswer(s"select id, name, price, _ts, description from 
$tableName")(
+          Seq(1, "a1", 10.0, 1000, "a1: desc1"),
+          Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+          Seq(3, "a3", 25.0, 1260, "a3: desc3")
+        )
+      }
+      if (tableType.equals("mor")) {
+        validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")), true)
+      }
+
+      // TODO: [HUDI-9375] get rid of this update and fix the rest of the test 
accordingly
+      // showcase the difference between event time and commit time ordering
+      // Partial updates using MERGE INTO statement with changed fields: 
"price" and "_ts"
+      spark.sql(
+        s"""
+           |merge into $tableName t0
+           |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as ts) s0
+           |on t0.id = s0.id
+           |when matched then update set price = s0.price, _ts = s0.ts
+           |""".stripMargin)
+
       checkAnswer(s"select id, name, price, _ts, description from $tableName")(
         Seq(1, "a1", 12.0, 1001, "a1: desc1"),
         Seq(2, "a2", 20.0, 1200, "a2: desc2"),
         Seq(3, "a3", 25.0, 1260, "a3: desc3")
       )
-
       if (tableType.equals("mor")) {
-        validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")), true)
+        validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("price", 
"_ts")), true)
       }
 
       // Partial updates using MERGE INTO statement with changed fields: 
"description" and "_ts"
@@ -331,9 +394,9 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       )
 
       if (tableType.equals("mor")) {
-        validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts", 
"description")), true)
+        validateLogBlock(basePath, 3, Seq(Seq("price", "_ts"), Seq("price", 
"_ts"), Seq("_ts", "description")), true)
 
-        spark.sql(s"set 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 3")
+        spark.sql(s"set 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 4")
         // Partial updates that trigger compaction
         spark.sql(
           s"""
@@ -427,6 +490,12 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
 
   def testPartialUpdateWithInserts(tableType: String,
                                    logDataBlockFormat: String): Unit = {
+    testPartialUpdateWithInserts(tableType, logDataBlockFormat, 
commitTimeOrdering = false)
+  }
+
+  def testPartialUpdateWithInserts(tableType: String,
+                                   logDataBlockFormat: String,
+                                   commitTimeOrdering: Boolean): Unit = {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = tmp.getCanonicalPath + "/" + tableName
@@ -434,6 +503,17 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(s"set 
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
       spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} = 
$logDataBlockFormat")
       spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
+      val mergeMode = if (commitTimeOrdering) {
+        RecordMergeMode.COMMIT_TIME_ORDERING.name()
+      } else {
+        RecordMergeMode.EVENT_TIME_ORDERING.name()
+      }
+
+      val preCombineString = if (commitTimeOrdering) {
+        ""
+      } else {
+        "preCombineField = '_ts',"
+      }
 
       // Create a table with five data fields
       spark.sql(
@@ -448,7 +528,8 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
            |tblproperties(
            | type ='$tableType',
            | primaryKey = 'id',
-           | preCombineField = '_ts'
+           | $preCombineString
+           | recordMergeMode = '$mergeMode'
            |)
            |location '$basePath'
         """.stripMargin)

Reply via email to