This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit dec9c8ed2adc035ee2c80b0fafbc4e21b08eef65
Author: vamsikarnika <[email protected]>
AuthorDate: Sun Oct 26 20:03:23 2025 +0530

    fix: Fix the instant time issue for row writer bulk insert hoodie streamer 
(#14153)
    
    
    ---------
    
    Co-authored-by: Vamsi <[email protected]>
---
 .../org/apache/hudi/common/util/CommitUtils.java   |  2 +-
 .../apache/hudi/common/util/TestCommitUtils.java   | 35 ++++++++++++++++++++++
 .../BaseDatasetBulkInsertCommitActionExecutor.java |  4 +--
 .../DatasetBucketRescaleCommitActionExecutor.java  |  4 +--
 .../DatasetBulkInsertCommitActionExecutor.java     |  4 +--
 ...setBulkInsertOverwriteCommitActionExecutor.java |  4 +--
 ...lkInsertOverwriteTableCommitActionExecutor.java |  4 +--
 ...eamerDatasetBulkInsertCommitActionExecutor.java |  4 +--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 12 ++++----
 .../apache/hudi/utilities/streamer/StreamSync.java | 10 +++++--
 .../deltastreamer/HoodieDeltaStreamerTestBase.java | 26 +++++++++++++---
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 20 +++++++------
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |  2 +-
 13 files changed, 96 insertions(+), 35 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
index 780874a04664..e75e0a0b728a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java
@@ -59,7 +59,7 @@ public class CommitUtils {
    */
   public static String getCommitActionType(WriteOperationType operation, 
HoodieTableType tableType) {
     if (operation == WriteOperationType.INSERT_OVERWRITE || operation == 
WriteOperationType.INSERT_OVERWRITE_TABLE
-        || operation == WriteOperationType.DELETE_PARTITION) {
+        || operation == WriteOperationType.DELETE_PARTITION || operation == 
WriteOperationType.BUCKET_RESCALE) {
       return HoodieTimeline.REPLACE_COMMIT_ACTION;
     } else {
       return getCommitActionType(tableType);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
index 506d3c0b75b2..14255024800a 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java
@@ -35,6 +35,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -152,6 +154,39 @@ public class TestCommitUtils {
         Option.empty(), 
CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY, 
ID3));
   }
 
+  @ParameterizedTest
+  @EnumSource(
+      value = WriteOperationType.class,
+      names = {"INSERT_OVERWRITE", "INSERT_OVERWRITE_TABLE", 
"DELETE_PARTITION", "BUCKET_RESCALE"}
+  )
+  public void 
testSpecialOverwriteOperationsReturnReplaceCommit(WriteOperationType op) {
+    String resultCOW = CommitUtils.getCommitActionType(op, 
HoodieTableType.COPY_ON_WRITE);
+    String resultMOR = CommitUtils.getCommitActionType(op, 
HoodieTableType.MERGE_ON_READ);
+
+    assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, resultCOW);
+    assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, resultMOR);
+  }
+
+  @ParameterizedTest
+  @EnumSource(
+      value = WriteOperationType.class,
+      mode = EnumSource.Mode.EXCLUDE,
+      names = {"INSERT_OVERWRITE", "INSERT_OVERWRITE_TABLE", 
"DELETE_PARTITION", "BUCKET_RESCALE"}
+  )
+  public void testNormalOperationsDelegateBasedOnTableType(WriteOperationType 
op) {
+    String resultCOW = CommitUtils.getCommitActionType(op, 
HoodieTableType.COPY_ON_WRITE);
+    String resultMOR = CommitUtils.getCommitActionType(op, 
HoodieTableType.MERGE_ON_READ);
+
+    assertEquals(HoodieActiveTimeline.COMMIT_ACTION, resultCOW);
+    assertEquals(HoodieActiveTimeline.DELTA_COMMIT_ACTION, resultMOR);
+  }
+
+  @Test
+  public void testNullOperationFallsBackToTableType() {
+    String result = CommitUtils.getCommitActionType(null, 
HoodieTableType.COPY_ON_WRITE);
+    assertEquals(HoodieActiveTimeline.COMMIT_ACTION, result);
+  }
+
   private HoodieWriteStat createWriteStat(String partition, String fileId) {
     HoodieWriteStat writeStat1 = new HoodieWriteStat();
     writeStat1.setPartitionPath(partition);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index 7612ad95fe3c..194c837f3d7c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -61,13 +61,13 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
   protected HoodieTable table;
 
   public BaseDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
-                                                   SparkRDDWriteClient 
writeClient) {
+                                                   SparkRDDWriteClient 
writeClient, String instantTime) {
     this.writeConfig = config;
     this.writeClient = writeClient;
+    this.instantTime = instantTime;
   }
 
   protected void preExecute() {
-    instantTime = writeClient.startCommit(getCommitActionType());
     table = writeClient.initTable(getWriteOperationType(), 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
     writeClient.preWrite(instantTime, getWriteOperationType(), 
table.getMetaClient());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index a5bf6013c52a..c18bf38a8ba7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -46,8 +46,8 @@ public class DatasetBucketRescaleCommitActionExecutor extends 
DatasetBulkInsertO
   private final int bucketNumber;
 
   public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config,
-                                                  SparkRDDWriteClient 
writeClient) {
-    super(config, writeClient);
+                                                  SparkRDDWriteClient 
writeClient, String instantTime) {
+    super(config, writeClient, instantTime);
     expression = config.getBucketIndexPartitionExpression();
     rule = config.getBucketIndexPartitionRuleType();
     bucketNumber = config.getBucketIndexNumBuckets();
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
index 8b124de565ca..9a499ca562e3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
@@ -31,8 +31,8 @@ import java.util.Map;
 public class DatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
   public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
-                                               SparkRDDWriteClient 
writeClient) {
-    super(config, writeClient);
+                                               SparkRDDWriteClient 
writeClient, String instantTime) {
+    super(config, writeClient, instantTime);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index 325b1334027a..4f9c16edcc88 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -43,8 +43,8 @@ import java.util.stream.Collectors;
 public class DatasetBulkInsertOverwriteCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
   public DatasetBulkInsertOverwriteCommitActionExecutor(HoodieWriteConfig 
config,
-                                                        SparkRDDWriteClient 
writeClient) {
-    super(config, writeClient);
+                                                        SparkRDDWriteClient 
writeClient, String instantTime) {
+    super(config, writeClient, instantTime);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
index 4dc82a34064d..d35325360745 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
@@ -35,8 +35,8 @@ import java.util.Map;
 public class DatasetBulkInsertOverwriteTableCommitActionExecutor extends 
DatasetBulkInsertOverwriteCommitActionExecutor {
 
   public DatasetBulkInsertOverwriteTableCommitActionExecutor(HoodieWriteConfig 
config,
-                                                             
SparkRDDWriteClient writeClient) {
-    super(config, writeClient);
+                                                             
SparkRDDWriteClient writeClient, String instantTime) {
+    super(config, writeClient, instantTime);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index 8e183629f48b..9858901ad5a6 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -34,8 +34,8 @@ import java.util.Map;
  */
 public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
-  public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig 
config, SparkRDDWriteClient writeClient) {
-    super(config, writeClient);
+  public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig 
config, SparkRDDWriteClient writeClient, String instantTime) {
+    super(config, writeClient, instantTime);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 02546c243458..dbcd065552f0 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -817,23 +817,25 @@ class HoodieSparkSqlWriterInternal {
     val overwriteOperationType = 
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
       .map(WriteOperationType.fromValue)
       .orNull
+
+    val commitActionType = 
CommitUtils.getCommitActionType(overwriteOperationType, 
writeConfig.getTableType)
+    val instantTime = writeClient.startCommit(commitActionType)
     val executor = mode match {
       case _ if overwriteOperationType == null =>
         // Don't need to overwrite
-        new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient)
+        new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient, 
instantTime)
       case SaveMode.Append if overwriteOperationType == 
WriteOperationType.INSERT_OVERWRITE =>
         // INSERT OVERWRITE PARTITION uses Append mode
-        new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, 
writeClient)
+        new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, 
writeClient, instantTime)
       case SaveMode.Append if overwriteOperationType == 
WriteOperationType.BUCKET_RESCALE =>
-        new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient)
+        new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient, 
instantTime)
       case SaveMode.Overwrite if overwriteOperationType == 
WriteOperationType.INSERT_OVERWRITE_TABLE =>
-        new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, 
writeClient)
+        new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, 
writeClient, instantTime)
       case _ =>
         throw new HoodieException(s"$mode with bulk_insert in row writer path 
is not supported yet");
     }
 
     val writeResult = executor.execute(df, tableConfig.isTablePartitioned)
-    val instantTime = executor.getInstantTime
 
     try {
       val (writeSuccessful, compactionInstant, clusteringInstant) = 
commitAndPerformPostOperations(
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 9b4498045649..5985abcf5423 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -810,9 +810,13 @@ public class StreamSync implements Serializable, Closeable 
{
   private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema 
writerSchema) {
     HoodieConfig hoodieConfig = new 
HoodieConfig(HoodieStreamer.Config.getProps(conf, cfg));
     hoodieConfig.setValue(DataSourceWriteOptions.TABLE_TYPE(), cfg.tableType);
-    hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), 
cfg.payloadClassName);
+    if (cfg.payloadClassName != null) {
+      hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), 
cfg.payloadClassName);
+    }
     hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_MODE().key(), 
cfg.recordMergeMode.name());
-    
hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID().key(), 
cfg.recordMergeStrategyId);
+    if (cfg.recordMergeStrategyId != null) {
+      
hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID().key(), 
cfg.recordMergeStrategyId);
+    }
     hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props));
     hoodieConfig.setValue("path", cfg.targetBasePath);
     return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema != 
InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(),
@@ -944,7 +948,7 @@ public class StreamSync implements Serializable, Closeable {
     if (useRowWriter) {
       Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().orElseGet(() -> 
hoodieSparkContext.getSqlContext().emptyDataFrame());
       HoodieWriteConfig hoodieWriteConfig = 
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
-      BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient);
+      BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient, instantTime);
       writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, 
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
     } else {
       TypedProperties mergeProps = ConfigUtils.getMergeProps(props, 
metaClient.getTableConfig());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 8a0add29b516..ca3edb212195 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -406,13 +406,20 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
                                          String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
                                          String partitionPath, String 
emptyBatchParam, boolean skipRecordKeyField) throws IOException {
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, 
addCommonProps,
-        partitionPath, emptyBatchParam, null, skipRecordKeyField);
+        partitionPath, emptyBatchParam, null, skipRecordKeyField, false);
+  }
+
+  protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+                                         String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
+                                         String partitionPath, String 
emptyBatchParam, boolean skipRecordKeyField, boolean useRowWriter) throws 
IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, 
addCommonProps,
+        partitionPath, emptyBatchParam, null, skipRecordKeyField, 
useRowWriter);
   }
 
   protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer, String sourceSchemaFile, String targetSchemaFile,
                                        String propsFileName, String 
parquetSourceRoot, boolean addCommonProps,
                                        String partitionPath, String 
emptyBatchParam, TypedProperties extraProps,
-                                         boolean skipRecordKeyField) throws 
IOException {
+                                         boolean skipRecordKeyField, boolean 
useRowWriter) throws IOException {
     // Properties used for testing delta-streamer with Parquet source
     TypedProperties parquetProps = TypedProperties.copy(extraProps);
 
@@ -424,6 +431,9 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
     parquetProps.setProperty("include", "base.properties");
     parquetProps.setProperty("hoodie.embed.timeline.server", "false");
+    if (useRowWriter) {
+      parquetProps.setProperty("hoodie.streamer.write.row.writer.enable", 
"true");
+    }
     if (!skipRecordKeyField) {
       parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     }
@@ -626,10 +636,18 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
           tableType, sourceOrderingField, checkpoint, false);
     }
 
+    public static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
+                                                        List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
+                                                        int sourceLimit, 
boolean updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
+                                                        String checkpoint, 
boolean allowCommitOnNoCheckpointChange) {
+      return makeConfig(basePath, op, sourceClassName, transformerClassNames, 
propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, 
updatePayloadClass, payloadClassName,
+          tableType, sourceOrderingField, checkpoint, 
allowCommitOnNoCheckpointChange, HoodieTableVersion.current());
+    }
+
     static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
                                                  List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
                                                  int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
-                                                 String checkpoint, boolean 
allowCommitOnNoCheckpointChange) {
+                                                 String checkpoint, boolean 
allowCommitOnNoCheckpointChange, HoodieTableVersion tableVersion) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
@@ -652,7 +670,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       Triple<RecordMergeMode, String, String> mergeCfgs =
           HoodieTableConfig.inferMergingConfigsForWrites(
               cfg.recordMergeMode, cfg.payloadClassName, 
cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
-              HoodieTableVersion.current());
+              tableVersion);
       cfg.recordMergeMode = mergeCfgs.getLeft();
       cfg.payloadClassName = mergeCfgs.getMiddle();
       cfg.recordMergeStrategyId = mergeCfgs.getRight();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 4d6729bd2da4..42f5aab7559a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -1163,7 +1163,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     extraProps.setProperty("hoodie.datasource.write.table.type", 
"MERGE_ON_READ");
     extraProps.setProperty("hoodie.datasource.compaction.async.enable", 
"false");
     prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", 
PROPS_FILENAME_TEST_PARQUET,
-        PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false);
+        PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false, 
false);
     String tableBasePath = basePath + "test_parquet_table" + testNum;
     HoodieDeltaStreamer.Config deltaCfg = 
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
ParquetDFSSource.class.getName(),
         null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -1835,34 +1835,36 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     testBulkInsertRowWriterMultiBatches(true, null);
   }
 
-  @Test
-  public void testBulkInsertRowWriterWithSchemaProviderAndTransformer() throws 
Exception {
-    testBulkInsertRowWriterMultiBatches(true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT", 
"NINE"})
+  public void 
testBulkInsertRowWriterWithSchemaProviderAndTransformer(HoodieTableVersion 
tableVersion) throws Exception {
+    testBulkInsertRowWriterMultiBatches(true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), false, 
tableVersion);
   }
 
   @Test
   public void testBulkInsertRowWriterForEmptyBatch() throws Exception {
-    testBulkInsertRowWriterMultiBatches(false, null, true);
+    testBulkInsertRowWriterMultiBatches(false, null, true, 
HoodieTableVersion.current());
   }
 
   private void testBulkInsertRowWriterMultiBatches(boolean useSchemaProvider, 
List<String> transformerClassNames) throws Exception {
-    testBulkInsertRowWriterMultiBatches(useSchemaProvider, 
transformerClassNames, false);
+    testBulkInsertRowWriterMultiBatches(useSchemaProvider, 
transformerClassNames, false, HoodieTableVersion.current());
   }
 
-  private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, 
List<String> transformerClassNames, boolean testEmptyBatch) throws Exception {
+  private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, 
List<String> transformerClassNames, boolean testEmptyBatch, HoodieTableVersion 
hoodieTableVersion) throws Exception {
     PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
     int parquetRecordsCount = 100;
     boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
     prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
-        PARQUET_SOURCE_ROOT, false, "partition_path", "");
+        PARQUET_SOURCE_ROOT, false, "partition_path", "", false, true);
 
     String tableBasePath = basePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT, testEmptyBatch ? 
TestParquetDFSSourceEmptyBatch.class.getName()
             : ParquetDFSSource.class.getName(),
         transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
-        useSchemaProvider, 100000, false, null, null, "timestamp", null);
+        useSchemaProvider, 100000, false, null, null, "timestamp", null, 
false, hoodieTableVersion);
     cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + 
"=true");
+    cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + 
hoodieTableVersion.versionCode());
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamer.sync();
     assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 620d9d23ac6b..d1caf5058ecb 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -215,7 +215,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
           transformerClassNames, PROPS_FILENAME_TEST_AVRO_KAFKA, false,  
useSchemaProvider, 100000, false, null, tableType, "timestamp", null);
     } else {
       prepareParquetDFSSource(false, hasTransformer, sourceSchemaFile, 
targetSchemaFile, PROPS_FILENAME_TEST_PARQUET,
-          PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false);
+          PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false, 
false);
       cfg = TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT, ParquetDFSSource.class.getName(),
           transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
           useSchemaProvider, dfsSourceLimitBytes, false, null, tableType, 
"timestamp", null);

Reply via email to