This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4fdac3d32931 fix(spark): fix MOR bulk insert commit operation error
(#18878)
4fdac3d32931 is described below
commit 4fdac3d32931f04f64e99b368218bd304677f130
Author: fhan <[email protected]>
AuthorDate: Mon Jun 1 17:14:44 2026 +0800
fix(spark): fix MOR bulk insert commit operation error (#18878)
* fix(spark): fix mor bulk insert commit type error
---------
Co-authored-by: fhan <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 ++
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 35 +++++++++++++++++++++-
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index aaa79929256f..07058b885b62 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -1099,6 +1099,9 @@ class HoodieSparkSqlWriterInternal {
}
}
val mergedParams = mutable.Map.empty ++
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)
+ if (!mergedParams.contains(HoodieTableConfig.TYPE.key) &&
mergedParams.contains(TABLE_TYPE.key)) {
+ mergedParams(HoodieTableConfig.TYPE.key) = mergedParams(TABLE_TYPE.key)
+ }
if (mergedParams.contains(KEYGENERATOR_CLASS_NAME.key) &&
!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_TYPE.key) =
KeyGeneratorType.fromClassName(mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)).name
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 15f0b5d571b1..8968caaf348f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.{HoodieConfig,
HoodieMetadataConfig, Record
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieFileFormat, HoodieRecord, HoodieRecordPayload,
HoodieReplaceCommitMetadata, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
-import org.apache.hudi.common.table.timeline.TimelineUtils
+import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
@@ -389,6 +389,39 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
}
}
+ /**
+ * Regression test for MOR row-writer bulk_insert commit action.
+ *
+ * The writer receives table type through the datasource option
+ * hoodie.datasource.write.table.type. mergeParamsAndGetHoodieConfig must
also
+ * propagate it to hoodie.table.type, because HoodieWriteConfig#getTableType
+ * reads the table-config key when row-writer bulk_insert chooses the commit
action.
+ */
+ @Test
+ def testMorRowWriterBulkInsertUsesDeltaCommitAction(): Unit = {
+ val fooTableModifier = commonTableModifier
+ .updated("hoodie.bulkinsert.shuffle.parallelism", "4")
+ .updated(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .updated(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
+ // Keep the timeline focused on the write instant; otherwise compaction
instants can obscure
+ // the commit action selected by the row-writer bulk_insert path.
+ .updated(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "true")
+
+ val schema = DataSourceTestUtils.getStructTypeExampleSchema
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
+ val records = DataSourceTestUtils.generateRandomRows(100)
+ val recordsSeq = convertRowListToSeq(records)
+ val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier,
df)
+
+ val metaClient = createMetaClient(spark, tempBasePath)
+ assertEquals(HoodieTableType.MERGE_ON_READ,
metaClient.getTableConfig.getTableType)
+ val lastCompletedWrite =
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+ assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION,
lastCompletedWrite.getAction)
+ }
+
/**
* Test case for insert dataset without ordering fields.
*/