This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4be388cd4272dff429531c70aaef48c49ba34bc4 Author: Lin Liu <[email protected]> AuthorDate: Tue Apr 8 00:25:59 2025 -0700 [HUDI-9258] Disable partial update for CUSTOM merge mode (#13092) (cherry picked from commit b28d7884971997f9892b868d63ced49d06335cb4) --- .../hudi/command/MergeIntoHoodieTableCommand.scala | 29 ++++++++++++- .../analysis/TestMergeIntoHoodieTableCommand.scala | 27 ++++++++++++ .../hudi/dml/TestPartialUpdateForMergeInto.scala | 48 ++++++++++++++++++++++ 3 files changed, 102 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index f6221abe849..31f2d9cdbf6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -506,9 +506,14 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie ENABLE_MERGE_INTO_PARTIAL_UPDATES.key, ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean && updatingActions.nonEmpty + // Partial update is enabled only for table version >= 8 && (parameters.getOrElse(HoodieWriteConfig.WRITE_TABLE_VERSION.key, HoodieTableVersion.current().versionCode().toString).toInt >= HoodieTableVersion.EIGHT.versionCode()) - && !useGlobalIndex(parameters)) + // Partial update is disabled when global index is used. + // After HUDI-9257 is done, we can remove this limitation. + && !useGlobalIndex(parameters) + // Partial update is disabled when custom merge mode is set. + && !useCustomMergeMode(parameters)) } private def getOperationType(parameters: Map[String, String]) = { @@ -760,6 +765,17 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie classOf[SqlKeyGenerator].getCanonicalName } + val mergeMode = if (tableConfig.getTableVersion.lesserThan(HoodieTableVersion.EIGHT)) { + val inferredMergeConfigs = HoodieTableConfig.inferCorrectMergingBehavior( + tableConfig.getRecordMergeMode, + tableConfig.getPayloadClass, + tableConfig.getRecordMergeStrategyId, + tableConfig.getPreCombineField, + tableConfig.getTableVersion) + inferredMergeConfigs.getLeft.name() + } else { + tableConfig.getRecordMergeMode.name() + } val overridingOpts = Map( "path" -> path, RECORDKEY_FIELD.key -> tableConfig.getRawRecordKeyFieldProp, @@ -782,7 +798,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL, PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, RECORD_MERGE_IMPL_CLASSES.key -> classOf[HoodieAvroRecordMerger].getName, - HoodieWriteConfig.RECORD_MERGE_MODE.key() -> RecordMergeMode.CUSTOM.name(), + HoodieWriteConfig.RECORD_MERGE_MODE.key() -> mergeMode, RECORD_MERGE_STRATEGY_ID.key() -> HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID, // NOTE: We have to explicitly override following configs to make sure no schema validation is performed @@ -1095,6 +1111,15 @@ object MergeIntoHoodieTableCommand { parameters.getOrElse(config.key, config.defaultValue().toString).toBoolean }.getOrElse(false) } + + def useCustomMergeMode(parameters: Map[String, String]): Boolean = { + val mergeModeOpt = parameters.get(DataSourceWriteOptions.RECORD_MERGE_MODE.key) + // For table version >= 8, mergeMode should exist. + if (mergeModeOpt.isEmpty) { + throw new HoodieException("Merge mode cannot be null here") + } + mergeModeOpt.get.equals(RecordMergeMode.CUSTOM.name) + } } object PartialAssignmentMode extends Enumeration { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala index 9f55f449452..2de3c170d5e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestMergeIntoHoodieTableCommand.scala @@ -19,7 +19,10 @@ package org.apache.spark.sql.hudi.analysis +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.config.RecordMergeMode import org.apache.hudi.config.{HoodieHBaseIndexConfig, HoodieIndexConfig} +import org.apache.hudi.exception.HoodieException import org.apache.hudi.index.HoodieIndex import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand @@ -78,4 +81,28 @@ class TestMergeIntoHoodieTableCommand extends AnyFlatSpec with Matchers { MergeIntoHoodieTableCommand.useGlobalIndex(parameters) should be(false) } } + + it should "return true for CUSTOM merge mode" in { + val params = Map( + DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.CUSTOM.name + ) + MergeIntoHoodieTableCommand.useCustomMergeMode(params) should be(true) + } + + it should "return false for non-CUSTOM merge mode" in { + var params = Map( + DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name + ) + MergeIntoHoodieTableCommand.useCustomMergeMode(params) should be(false) + + params = Map( + DataSourceWriteOptions.RECORD_MERGE_MODE.key -> RecordMergeMode.EVENT_TIME_ORDERING.name + ) + MergeIntoHoodieTableCommand.useCustomMergeMode(params) should be(false) + } + + it should "throw HoodieException when merge mode is missing" in { + val params = Map.empty[String, String] + an[HoodieException] should be thrownBy MergeIntoHoodieTableCommand.useCustomMergeMode(params) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala index 70c81dd2a26..7d508d5ba0a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala @@ -594,6 +594,54 @@ class TestPartialUpdateForMergeInto extends HoodieSparkSqlTestBase { ) } + test("Test MergeInto Partial Updates should fail with CUSTOM payload and merge mode") { + withTempDir { tmp => + withSQLConf( + "hoodie.index.type" -> "GLOBAL_SIMPLE", + "hoodie.write.record.merge.mode" -> "CUSTOM", + "hoodie.datasource.write.payload.class" -> "org.apache.hudi.common.testutils.reader.HoodieRecordTestPayload") { + val tableName = generateTableName + spark.sql( + s""" + | CREATE TABLE $tableName ( + | record_key STRING, + | name STRING, + | age INT, + | department STRING, + | salary DOUBLE, + | ts BIGINT + | ) USING hudi + | PARTITIONED BY (department) + | LOCATION '${tmp.getCanonicalPath}' + | TBLPROPERTIES ( + | type = 'mor', + | primaryKey = 'record_key', + | preCombineField = 'ts')""".stripMargin) + + spark.sql( + s""" + | INSERT INTO $tableName + | SELECT * FROM ( + | SELECT 'emp_001' as record_key, 'John Doe' as name, 30 as age, + | 'Sales' as department, 80000.0 as salary, 1598886000 as ts + | UNION ALL + | SELECT 'emp_002', 'Jane Smith', 28, 'Sales', 75000.0, 1598886001 + | UNION ALL + | SELECT 'emp_003', 'Bob Wilson', 35, 'Marketing', 85000.0, 1598886002 + |)""".stripMargin) + + val failedToResolveError = "MERGE INTO field resolution error: No matching assignment found for target table" + checkExceptionContain( + s""" + |merge into $tableName t0 + |using ( SELECT 'emp_001' as record_key, 'John Doe' as name, 35 as age, cast(1598886200 as BIGINT) as ts) s0 + |on t0.record_key = s0.record_key + |when matched then update set age = s0.age + """.stripMargin)(failedToResolveError) + } + } + } + def validateLogBlock(basePath: String, expectedNumLogFile: Int, changedFields: Seq[Seq[String]],
