This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4be388cd4272dff429531c70aaef48c49ba34bc4
Author: Lin Liu <[email protected]>
AuthorDate: Tue Apr 8 00:25:59 2025 -0700

    [HUDI-9258] Disable partial update for CUSTOM merge mode (#13092)
    
    (cherry picked from commit b28d7884971997f9892b868d63ced49d06335cb4)
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala | 29 ++++++++++++-
 .../analysis/TestMergeIntoHoodieTableCommand.scala | 27 ++++++++++++
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   | 48 ++++++++++++++++++++++
 3 files changed, 102 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f6221abe849..31f2d9cdbf6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -506,9 +506,14 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
       ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
       && updatingActions.nonEmpty
+      // Partial update is enabled only for table version >= 8
       && (parameters.getOrElse(HoodieWriteConfig.WRITE_TABLE_VERSION.key, 
HoodieTableVersion.current().versionCode().toString).toInt
       >= HoodieTableVersion.EIGHT.versionCode())
-      && !useGlobalIndex(parameters))
+      // Partial update is disabled when global index is used.
+      // After HUDI-9257 is done, we can remove this limitation.
+      && !useGlobalIndex(parameters)
+      // Partial update is disabled when custom merge mode is set.
+      && !useCustomMergeMode(parameters))
   }
 
   private def getOperationType(parameters: Map[String, String]) = {
@@ -760,6 +765,17 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       classOf[SqlKeyGenerator].getCanonicalName
     }
 
+    val mergeMode = if 
(tableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT)) {
+      val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior(
+        tableConfig.getRecordMergeMode,
+        tableConfig.getPayloadClass,
+        tableConfig.getRecordMergeStrategyId,
+        tableConfig.getPreCombineField,
+        tableConfig.getTableVersion)
+      inferredMergeConfigs.getLeft.name()
+    } else {
+      tableConfig.getRecordMergeMode.name()
+    }
     val overridingOpts = Map(
       "path" -> path,
       RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp,
@@ -782,7 +798,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
       PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
       RECORD_MERGE_IMPL_CLASSES.key -> classOf[HoodieAvroRecordMerger].getName,
-      HoodieWriteConfig.RECORD_MERGE_MODE.key() -> 
RecordMergeMode.CUSTOM.name(),
+      HoodieWriteConfig.RECORD_MERGE_MODE.key() -> mergeMode,
       RECORD_MERGE_STRATEGY_ID.key() -> 
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID,
 
       // NOTE: We have to explicitly override following configs to make sure 
no schema validation is performed
@@ -1095,6 +1111,15 @@ object MergeIntoHoodieTableCommand {
         parameters.getOrElse(config.key, 
config.defaultValue().toString).toBoolean
     }.getOrElse(false)
   }
+
+  def useCustomMergeMode(parameters: Map[String, String]): Boolean = {
+    val mergeModeOpt = 
parameters.get(DataSourceWriteOptions.RECORD_MERGE_MODE.key)
+    // For table version >= 8, mergeMode should exist.
+    if (mergeModeOpt.isEmpty) {
+      throw new HoodieException("Merge mode cannot be null here")
+    }
+    mergeModeOpt.get.equals(RecordMergeMode.CUSTOM.name)
+  }
 }
 
 object PartialAssignmentMode extends Enumeration {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
index 9f55f449452..2de3c170d5e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala
@@ -19,7 +19,10 @@
 
 package org.apache.spark.sql.hudi.analysis
 
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.RecordMergeMode
 import org.apache.hudi.config.{HoodieHBaseIndexConfig, HoodieIndexConfig}
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.index.HoodieIndex
 
 import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand
@@ -78,4 +81,28 @@ class TestMergeIntoHoodieTableCommand extends AnyFlatSpec 
with Matchers {
       MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false)
     }
   }
+
+  it should "return true for CUSTOM merge mode" in {
+    val params = Map(
+      DataSourceWriteOptions.RECORD_MERGE_MODE.key -> 
RecordMergeMode.CUSTOM.name
+    )
+    MergeIntoHoodieTableCommand.useCustomMergeMode(params) should be(true)
+  }
+
+  it should "return false for non-CUSTOM merge mode" in {
+    var params = Map(
+      DataSourceWriteOptions.RECORD_MERGE_MODE.key -> 
RecordMergeMode.COMMIT_TIME_ORDERING.name
+    )
+    MergeIntoHoodieTableCommand.useCustomMergeMode(params) should be(false)
+
+    params = Map(
+      DataSourceWriteOptions.RECORD_MERGE_MODE.key -> 
RecordMergeMode.EVENT_TIME_ORDERING.name
+    )
+    MergeIntoHoodieTableCommand.useCustomMergeMode(params) should be(false)
+  }
+
+  it should "throw HoodieException when merge mode is missing" in {
+    val params = Map.empty[String, String]
+    an[HoodieException] should be thrownBy 
MergeIntoHoodieTableCommand.useCustomMergeMode(params)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 70c81dd2a26..7d508d5ba0a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -594,6 +594,54 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     )
   }
 
+  test("Test MergeInto Partial Updates should fail with CUSTOM payload and 
merge mode") {
+    withTempDir { tmp =>
+      withSQLConf(
+        "hoodie.index.type" -> "GLOBAL_SIMPLE",
+        "hoodie.write.record.merge.mode" -> "CUSTOM",
+        "hoodie.datasource.write.payload.class" -> 
"org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload") {
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             | CREATE TABLE $tableName (
+             |   record_key STRING,
+             |   name STRING,
+             |   age INT,
+             |   department STRING,
+             |   salary DOUBLE,
+             |   ts BIGINT
+             | ) USING hudi
+             | PARTITIONED BY (department)
+             | LOCATION '${tmp.getCanonicalPath}'
+             | TBLPROPERTIES (
+             |   type = 'mor',
+             |   primaryKey = 'record_key',
+             |   preCombineField = 'ts')""".stripMargin)
+
+        spark.sql(
+          s"""
+             | INSERT INTO $tableName
+             | SELECT * FROM (
+             |   SELECT 'emp_001' as record_key, 'John Doe' as name, 30 as age,
+             |          'Sales' as department, 80000.0 as salary, 1598886000 
as ts
+             |   UNION ALL
+             |   SELECT 'emp_002', 'Jane Smith', 28, 'Sales', 75000.0, 
1598886001
+             |   UNION ALL
+             |   SELECT 'emp_003', 'Bob Wilson', 35, 'Marketing', 85000.0, 
1598886002
+             |)""".stripMargin)
+
+        val failedToResolveError = "MERGE INTO field resolution error: No 
matching assignment found for target table"
+        checkExceptionContain(
+          s"""
+             |merge into $tableName t0
+             |using ( SELECT 'emp_001' as record_key, 'John Doe' as name, 35 
as age, cast(1598886200 as BIGINT) as ts) s0
+             |on t0.record_key = s0.record_key
+             |when matched then update set age = s0.age
+      """.stripMargin)(failedToResolveError)
+      }
+    }
+  }
+
   def validateLogBlock(basePath: String,
                        expectedNumLogFile: Int,
                        changedFields: Seq[Seq[String]],

Reply via email to