This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fdac3d32931 fix(spark): fix MOR bulk insert commit operation error 
(#18878)
4fdac3d32931 is described below

commit 4fdac3d32931f04f64e99b368218bd304677f130
Author: fhan <[email protected]>
AuthorDate: Mon Jun 1 17:14:44 2026 +0800

    fix(spark): fix MOR bulk insert commit operation error (#18878)
    
    * fix(spark): fix mor bulk insert commit type error
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  3 ++
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 35 +++++++++++++++++++++-
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index aaa79929256f..07058b885b62 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -1099,6 +1099,9 @@ class HoodieSparkSqlWriterInternal {
         }
     }
     val mergedParams = mutable.Map.empty ++ 
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptsWithMappedTableConfig.toMap)
+    if (!mergedParams.contains(HoodieTableConfig.TYPE.key) && 
mergedParams.contains(TABLE_TYPE.key)) {
+      mergedParams(HoodieTableConfig.TYPE.key) = mergedParams(TABLE_TYPE.key)
+    }
     if (mergedParams.contains(KEYGENERATOR_CLASS_NAME.key) && 
!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_TYPE.key)) {
       mergedParams(HoodieTableConfig.KEY_GENERATOR_TYPE.key) = 
KeyGeneratorType.fromClassName(mergedParams(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key)).name
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 15f0b5d571b1..8968caaf348f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.{HoodieConfig, 
HoodieMetadataConfig, Record
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieFileFormat, HoodieRecord, HoodieRecordPayload, 
HoodieReplaceCommitMetadata, HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.table.timeline.TimelineUtils
+import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils}
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, 
HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
@@ -389,6 +389,39 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
     }
   }
 
+  /**
+   * Regression test for MOR row-writer bulk_insert commit action.
+   *
+   * The writer receives table type through the datasource option
+   * hoodie.datasource.write.table.type. mergeParamsAndGetHoodieConfig must 
also
+   * propagate it to hoodie.table.type, because HoodieWriteConfig#getTableType
+   * reads the table-config key when row-writer bulk_insert chooses the commit 
action.
+   */
+  @Test
+  def testMorRowWriterBulkInsertUsesDeltaCommitAction(): Unit = {
+    val fooTableModifier = commonTableModifier
+      .updated("hoodie.bulkinsert.shuffle.parallelism", "4")
+      .updated(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .updated(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
+      // Keep the timeline focused on the write instant; otherwise compaction 
instants can obscure
+      // the commit action selected by the row-writer bulk_insert path.
+      .updated(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "true")
+
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
+    val records = DataSourceTestUtils.generateRandomRows(100)
+    val recordsSeq = convertRowListToSeq(records)
+    val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
+
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, 
df)
+
+    val metaClient = createMetaClient(spark, tempBasePath)
+    assertEquals(HoodieTableType.MERGE_ON_READ, 
metaClient.getTableConfig.getTableType)
+    val lastCompletedWrite = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+    assertEquals(HoodieTimeline.DELTA_COMMIT_ACTION, 
lastCompletedWrite.getAction)
+  }
+
   /**
    * Test case for insert dataset without ordering fields.
    */

Reply via email to