This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b57f1eb8ab [HUDI-8835] Support partial update in COMMIT_TIME_ORDERING
merge mode (#13263)
6b57f1eb8ab is described below
commit 6b57f1eb8abe862595b607d210003b239b2b0578
Author: Jon Vexler <[email protected]>
AuthorDate: Thu May 15 15:26:14 2025 -0400
[HUDI-8835] Support partial update in COMMIT_TIME_ORDERING merge mode
(#13263)
Co-authored-by: Jonathan Vexler <=>
---
.../org/apache/hudi/DefaultSparkRecordMerger.java | 60 ++-----------
.../org/apache/hudi/HoodieSparkRecordMerger.java | 44 ++++++++++
.../hudi/OverwriteWithLatestSparkRecordMerger.java | 12 +++
.../hudi/dml/TestPartialUpdateForMergeInto.scala | 99 ++++++++++++++++++++--
4 files changed, 152 insertions(+), 63 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
index 652b016cf50..7ac55ea81e4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
@@ -21,11 +21,9 @@ package org.apache.hudi;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.merge.SparkRecordMergingUtils;
@@ -45,34 +43,11 @@ public class DefaultSparkRecordMerger extends
HoodieSparkRecordMerger {
@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
- ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
-
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ Option<Pair<HoodieRecord, Schema>> deleteHandlingResult =
handleDeletes(older, oldSchema, newer, newSchema, props);
+ if (deleteHandlingResult != null) {
+ return deleteHandlingResult;
}
- if (older instanceof HoodieSparkRecord) {
- HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
- if (oldSparkRecord.isDelete(oldSchema, props)) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- } else {
- if (older.getData() == null) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- }
if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
return Option.of(Pair.of(older, oldSchema));
} else {
@@ -82,34 +57,11 @@ public class DefaultSparkRecordMerger extends
HoodieSparkRecordMerger {
@Override
public Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema,
TypedProperties props) throws IOException {
- ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
- ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
-
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ Option<Pair<HoodieRecord, Schema>> deleteHandlingResult =
handleDeletes(older, oldSchema, newer, newSchema, props);
+ if (deleteHandlingResult != null) {
+ return deleteHandlingResult;
}
- if (older instanceof HoodieSparkRecord) {
- HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
- if (oldSparkRecord.isDelete(oldSchema, props)) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- } else {
- if (older.getData() == null) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- }
if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
return Option.of(SparkRecordMergingUtils.mergePartialRecords(
(HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older,
oldSchema, readerSchema, props));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 79e2708f9eb..77f696170f1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -19,12 +19,56 @@
package org.apache.hudi;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
public abstract class HoodieSparkRecordMerger implements HoodieRecordMerger {
@Override
public HoodieRecord.HoodieRecordType getRecordType() {
return HoodieRecord.HoodieRecordType.SPARK;
}
+
+ /**
+ * Basic handling of deletes that is used by many of the spark mergers
+ * returns null if merger specific logic should be used
+ */
+ protected Option<Pair<HoodieRecord, Schema>> handleDeletes(HoodieRecord
older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties
props) {
+ ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.SPARK);
+ ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.SPARK);
+
+ if (newer instanceof HoodieSparkRecord) {
+ HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+ if (newSparkRecord.isDelete(newSchema, props)) {
+ // Delete record
+ return Option.empty();
+ }
+ } else {
+ if (newer.getData() == null) {
+ // Delete record
+ return Option.empty();
+ }
+ }
+
+ if (older instanceof HoodieSparkRecord) {
+ HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
+ if (oldSparkRecord.isDelete(oldSchema, props)) {
+ // use natural order for delete record
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ } else {
+ if (older.getData() == null) {
+ // use natural order for delete record
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ }
+
+ return null;
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
index 403f392ef1a..95cc8909dc9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/OverwriteWithLatestSparkRecordMerger.java
@@ -21,8 +21,10 @@ package org.apache.hudi;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.merge.SparkRecordMergingUtils;
import org.apache.avro.Schema;
@@ -42,4 +44,14 @@ public class OverwriteWithLatestSparkRecordMerger extends
HoodieSparkRecordMerge
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
return Option.of(Pair.of(newer, newSchema));
}
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema,
TypedProperties props) throws IOException {
+ Option<Pair<HoodieRecord, Schema>> deleteHandlingResult =
handleDeletes(older, oldSchema, newer, newSchema, props);
+ if (deleteHandlingResult != null) {
+ return deleteHandlingResult;
+ }
+ return Option.of(SparkRecordMergingUtils.mergePartialRecords(
+ (HoodieSparkRecord) older, oldSchema, (HoodieSparkRecord) newer,
newSchema, readerSchema, props));
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 7d508d5ba0a..867a032f7de 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.dml
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig,
RecordMergeMode}
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.{HoodieTableMetaClient,
HoodieTableVersion, TableSchemaResolver}
import org.apache.hudi.common.table.log.HoodieLogFileReader
@@ -46,14 +46,26 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
testPartialUpdate("cow", "avro")
}
+ test("Test partial update with COW and Avro log format and commit time
ordering") {
+ testPartialUpdate("cow", "avro", commitTimeOrdering = true)
+ }
+
test("Test partial update with MOR and Avro log format") {
testPartialUpdate("mor", "avro")
}
+ test("Test partial update with MOR and avro log format and commit time
ordering") {
+ testPartialUpdate("mor", "avro", commitTimeOrdering = true)
+ }
+
test("Test partial update with MOR and Parquet log format") {
testPartialUpdate("mor", "parquet")
}
+ test("Test partial update with MOR and Parquet log format and commit time
ordering") {
+ testPartialUpdate("mor", "parquet", commitTimeOrdering = true)
+ }
+
test("Test partial update and insert with COW and Avro log format") {
testPartialUpdateWithInserts("cow", "avro")
}
@@ -62,10 +74,18 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
testPartialUpdateWithInserts("mor", "avro")
}
+ test("Test partial update and insert with MOR and Avro log format and commit
time ordering") {
+ testPartialUpdateWithInserts("mor", "avro", commitTimeOrdering = true)
+ }
+
test("Test partial update and insert with MOR and Parquet log format") {
testPartialUpdateWithInserts("mor", "parquet")
}
+ test("Test partial update and insert with MOR and Parquet log format and
commit time ordering") {
+ testPartialUpdateWithInserts("mor", "parquet", commitTimeOrdering = true)
+ }
+
test("Test partial update with schema on read enabled") {
withSQLConf(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key() ->
"true") {
try {
@@ -257,6 +277,12 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
def testPartialUpdate(tableType: String,
logDataBlockFormat: String): Unit = {
+ testPartialUpdate(tableType, logDataBlockFormat, commitTimeOrdering =
false)
+ }
+
+ def testPartialUpdate(tableType: String,
+ logDataBlockFormat: String,
+ commitTimeOrdering: Boolean): Unit = {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = tmp.getCanonicalPath + "/" + tableName
@@ -264,7 +290,17 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(s"set
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} =
$logDataBlockFormat")
spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} =
true")
+ val mergeMode = if (commitTimeOrdering) {
+ RecordMergeMode.COMMIT_TIME_ORDERING.name()
+ } else {
+ RecordMergeMode.EVENT_TIME_ORDERING.name()
+ }
+ val preCombineString = if (commitTimeOrdering) {
+ ""
+ } else {
+ "preCombineField = '_ts',"
+ }
// Create a table with five data fields
spark.sql(
s"""
@@ -278,7 +314,8 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
|tblproperties(
| type ='$tableType',
| primaryKey = 'id',
- | preCombineField = '_ts'
+ | $preCombineString
+ | recordMergeMode = '$mergeMode'
|)
|location '$basePath'
""".stripMargin)
@@ -296,21 +333,47 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(
s"""
|merge into $tableName t0
- |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as ts
+ |using ( select 1 as id, 'a1' as name, 12.0 as price, 999 as ts
|union select 3 as id, 'a3' as name, 25.0 as price, 1260 as ts) s0
|on t0.id = s0.id
|when matched then update set price = s0.price, _ts = s0.ts
|""".stripMargin)
-
validateTableSchema(tableName, structFields)
+ if (commitTimeOrdering) {
+ checkAnswer(s"select id, name, price, _ts, description from
$tableName")(
+ Seq(1, "a1", 12.0, 999, "a1: desc1"),
+ Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+ Seq(3, "a3", 25.0, 1260, "a3: desc3")
+ )
+ } else {
+ checkAnswer(s"select id, name, price, _ts, description from
$tableName")(
+ Seq(1, "a1", 10.0, 1000, "a1: desc1"),
+ Seq(2, "a2", 20.0, 1200, "a2: desc2"),
+ Seq(3, "a3", 25.0, 1260, "a3: desc3")
+ )
+ }
+ if (tableType.equals("mor")) {
+ validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")), true)
+ }
+
+ // TODO: [HUDI-9375] get rid of this update and fix the rest of the test
accordingly
+ // showcase the difference between event time and commit time ordering
+ // Partial updates using MERGE INTO statement with changed fields:
"price" and "_ts"
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as ts) s0
+ |on t0.id = s0.id
+ |when matched then update set price = s0.price, _ts = s0.ts
+ |""".stripMargin)
+
checkAnswer(s"select id, name, price, _ts, description from $tableName")(
Seq(1, "a1", 12.0, 1001, "a1: desc1"),
Seq(2, "a2", 20.0, 1200, "a2: desc2"),
Seq(3, "a3", 25.0, 1260, "a3: desc3")
)
-
if (tableType.equals("mor")) {
- validateLogBlock(basePath, 1, Seq(Seq("price", "_ts")), true)
+ validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("price",
"_ts")), true)
}
// Partial updates using MERGE INTO statement with changed fields:
"description" and "_ts"
@@ -331,9 +394,9 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
)
if (tableType.equals("mor")) {
- validateLogBlock(basePath, 2, Seq(Seq("price", "_ts"), Seq("_ts",
"description")), true)
+ validateLogBlock(basePath, 3, Seq(Seq("price", "_ts"), Seq("price",
"_ts"), Seq("_ts", "description")), true)
- spark.sql(s"set
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 3")
+ spark.sql(s"set
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key} = 4")
// Partial updates that trigger compaction
spark.sql(
s"""
@@ -427,6 +490,12 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
def testPartialUpdateWithInserts(tableType: String,
logDataBlockFormat: String): Unit = {
+ testPartialUpdateWithInserts(tableType, logDataBlockFormat,
commitTimeOrdering = false)
+ }
+
+ def testPartialUpdateWithInserts(tableType: String,
+ logDataBlockFormat: String,
+ commitTimeOrdering: Boolean): Unit = {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = tmp.getCanonicalPath + "/" + tableName
@@ -434,6 +503,17 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
spark.sql(s"set
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
spark.sql(s"set ${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key} =
$logDataBlockFormat")
spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} =
true")
+ val mergeMode = if (commitTimeOrdering) {
+ RecordMergeMode.COMMIT_TIME_ORDERING.name()
+ } else {
+ RecordMergeMode.EVENT_TIME_ORDERING.name()
+ }
+
+ val preCombineString = if (commitTimeOrdering) {
+ ""
+ } else {
+ "preCombineField = '_ts',"
+ }
// Create a table with five data fields
spark.sql(
@@ -448,7 +528,8 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
|tblproperties(
| type ='$tableType',
| primaryKey = 'id',
- | preCombineField = '_ts'
+ | $preCombineString
+ | recordMergeMode = '$mergeMode'
|)
|location '$basePath'
""".stripMargin)