This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dec9c8ed2adc035ee2c80b0fafbc4e21b08eef65 Author: vamsikarnika <[email protected]> AuthorDate: Sun Oct 26 20:03:23 2025 +0530 fix: Fix the instant time issue for row writer bulk insert hoodie streamer (#14153) --------- Co-authored-by: Vamsi <[email protected]> --- .../org/apache/hudi/common/util/CommitUtils.java | 2 +- .../apache/hudi/common/util/TestCommitUtils.java | 35 ++++++++++++++++++++++ .../BaseDatasetBulkInsertCommitActionExecutor.java | 4 +-- .../DatasetBucketRescaleCommitActionExecutor.java | 4 +-- .../DatasetBulkInsertCommitActionExecutor.java | 4 +-- ...setBulkInsertOverwriteCommitActionExecutor.java | 4 +-- ...lkInsertOverwriteTableCommitActionExecutor.java | 4 +-- ...eamerDatasetBulkInsertCommitActionExecutor.java | 4 +-- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 12 ++++---- .../apache/hudi/utilities/streamer/StreamSync.java | 10 +++++-- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 26 +++++++++++++--- .../deltastreamer/TestHoodieDeltaStreamer.java | 20 +++++++------ ...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 2 +- 13 files changed, 96 insertions(+), 35 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java index 780874a04664..e75e0a0b728a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java @@ -59,7 +59,7 @@ public class CommitUtils { */ public static String getCommitActionType(WriteOperationType operation, HoodieTableType tableType) { if (operation == WriteOperationType.INSERT_OVERWRITE || operation == WriteOperationType.INSERT_OVERWRITE_TABLE - || operation == WriteOperationType.DELETE_PARTITION) { + || operation == WriteOperationType.DELETE_PARTITION || operation == WriteOperationType.BUCKET_RESCALE) { return HoodieTimeline.REPLACE_COMMIT_ACTION; } else { return getCommitActionType(tableType); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java index 506d3c0b75b2..14255024800a 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCommitUtils.java @@ -35,6 +35,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; import java.util.ArrayList; @@ -152,6 +154,39 @@ public class TestCommitUtils { Option.empty(), CommitUtils.getValidCheckpointForCurrentWriter(timeline, SINK_CHECKPOINT_KEY, ID3)); } + @ParameterizedTest + @EnumSource( + value = WriteOperationType.class, + names = {"INSERT_OVERWRITE", "INSERT_OVERWRITE_TABLE", "DELETE_PARTITION", "BUCKET_RESCALE"} + ) + public void testSpecialOverwriteOperationsReturnReplaceCommit(WriteOperationType op) { + String resultCOW = CommitUtils.getCommitActionType(op, HoodieTableType.COPY_ON_WRITE); + String resultMOR = CommitUtils.getCommitActionType(op, HoodieTableType.MERGE_ON_READ); + + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, resultCOW); + assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, resultMOR); + } + + @ParameterizedTest + @EnumSource( + value = WriteOperationType.class, + mode = EnumSource.Mode.EXCLUDE, + names = {"INSERT_OVERWRITE", "INSERT_OVERWRITE_TABLE", "DELETE_PARTITION", "BUCKET_RESCALE"} + ) + public void testNormalOperationsDelegateBasedOnTableType(WriteOperationType op) { + String resultCOW = CommitUtils.getCommitActionType(op, HoodieTableType.COPY_ON_WRITE); + String resultMOR = CommitUtils.getCommitActionType(op, HoodieTableType.MERGE_ON_READ); + + assertEquals(HoodieActiveTimeline.COMMIT_ACTION, resultCOW); + assertEquals(HoodieActiveTimeline.DELTA_COMMIT_ACTION, resultMOR); + } + + @Test + public void testNullOperationFallsBackToTableType() { + String result = CommitUtils.getCommitActionType(null, HoodieTableType.COPY_ON_WRITE); + assertEquals(HoodieActiveTimeline.COMMIT_ACTION, result); + } + private HoodieWriteStat createWriteStat(String partition, String fileId) { HoodieWriteStat writeStat1 = new HoodieWriteStat(); writeStat1.setPartitionPath(partition); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java index 7612ad95fe3c..194c837f3d7c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java @@ -61,13 +61,13 @@ public abstract class BaseDatasetBulkInsertCommitActionExecutor implements Seria protected HoodieTable table; public BaseDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, - SparkRDDWriteClient writeClient) { + SparkRDDWriteClient writeClient, String instantTime) { this.writeConfig = config; this.writeClient = writeClient; + this.instantTime = instantTime; } protected void preExecute() { - instantTime = writeClient.startCommit(getCommitActionType()); table = writeClient.initTable(getWriteOperationType(), Option.ofNullable(instantTime)); table.validateInsertSchema(); writeClient.preWrite(instantTime, getWriteOperationType(), table.getMetaClient()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java index a5bf6013c52a..c18bf38a8ba7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java @@ -46,8 +46,8 @@ public class DatasetBucketRescaleCommitActionExecutor extends DatasetBulkInsertO private final int bucketNumber; public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config, - SparkRDDWriteClient writeClient) { - super(config, writeClient); + SparkRDDWriteClient writeClient, String instantTime) { + super(config, writeClient, instantTime); expression = config.getBucketIndexPartitionExpression(); rule = config.getBucketIndexPartitionRuleType(); bucketNumber = config.getBucketIndexNumBuckets(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java index 8b124de565ca..9a499ca562e3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java @@ -31,8 +31,8 @@ import java.util.Map; public class DatasetBulkInsertCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor { public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, - SparkRDDWriteClient writeClient) { - super(config, writeClient); + SparkRDDWriteClient writeClient, String instantTime) { + super(config, writeClient, instantTime); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java index 325b1334027a..4f9c16edcc88 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java @@ -43,8 +43,8 @@ import java.util.stream.Collectors; public class DatasetBulkInsertOverwriteCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor { public DatasetBulkInsertOverwriteCommitActionExecutor(HoodieWriteConfig config, - SparkRDDWriteClient writeClient) { - super(config, writeClient); + SparkRDDWriteClient writeClient, String instantTime) { + super(config, writeClient, instantTime); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java index 4dc82a34064d..d35325360745 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java @@ -35,8 +35,8 @@ import java.util.Map; public class DatasetBulkInsertOverwriteTableCommitActionExecutor extends DatasetBulkInsertOverwriteCommitActionExecutor { public DatasetBulkInsertOverwriteTableCommitActionExecutor(HoodieWriteConfig config, - SparkRDDWriteClient writeClient) { - super(config, writeClient); + SparkRDDWriteClient writeClient, String instantTime) { + super(config, writeClient, instantTime); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java index 8e183629f48b..9858901ad5a6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java @@ -34,8 +34,8 @@ import java.util.Map; */ public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends BaseDatasetBulkInsertCommitActionExecutor { - public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, SparkRDDWriteClient writeClient) { - super(config, writeClient); + public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config, SparkRDDWriteClient writeClient, String instantTime) { + super(config, writeClient, instantTime); } @Override diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 02546c243458..dbcd065552f0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -817,23 +817,25 @@ class HoodieSparkSqlWriterInternal { val overwriteOperationType = Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE)) .map(WriteOperationType.fromValue) .orNull + + val commitActionType = CommitUtils.getCommitActionType(overwriteOperationType, writeConfig.getTableType) + val instantTime = writeClient.startCommit(commitActionType) val executor = mode match { case _ if overwriteOperationType == null => // Don't need to overwrite - new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient) + new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient, instantTime) case SaveMode.Append if overwriteOperationType == WriteOperationType.INSERT_OVERWRITE => // INSERT OVERWRITE PARTITION uses Append mode - new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, writeClient) + new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, writeClient, instantTime) case SaveMode.Append if overwriteOperationType == WriteOperationType.BUCKET_RESCALE => - new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient) + new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient, instantTime) case SaveMode.Overwrite if overwriteOperationType == WriteOperationType.INSERT_OVERWRITE_TABLE => - new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, writeClient) + new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, writeClient, instantTime) case _ => throw new HoodieException(s"$mode with bulk_insert in row writer path is not supported yet"); } val writeResult = executor.execute(df, tableConfig.isTablePartitioned) - val instantTime = executor.getInstantTime try { val (writeSuccessful, compactionInstant, clusteringInstant) = commitAndPerformPostOperations( diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 9b4498045649..5985abcf5423 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -810,9 +810,13 @@ public class StreamSync implements Serializable, Closeable { private HoodieWriteConfig prepareHoodieConfigForRowWriter(Schema writerSchema) { HoodieConfig hoodieConfig = new HoodieConfig(HoodieStreamer.Config.getProps(conf, cfg)); hoodieConfig.setValue(DataSourceWriteOptions.TABLE_TYPE(), cfg.tableType); - hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), cfg.payloadClassName); + if (cfg.payloadClassName != null) { + hoodieConfig.setValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(), cfg.payloadClassName); + } hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_MODE().key(), cfg.recordMergeMode.name()); - hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID().key(), cfg.recordMergeStrategyId); + if (cfg.recordMergeStrategyId != null) { + hoodieConfig.setValue(DataSourceWriteOptions.RECORD_MERGE_STRATEGY_ID().key(), cfg.recordMergeStrategyId); + } hoodieConfig.setValue(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName(props)); hoodieConfig.setValue("path", cfg.targetBasePath); return HoodieSparkSqlWriter.getBulkInsertRowConfig(writerSchema != InputBatch.NULL_SCHEMA ? Option.of(writerSchema) : Option.empty(), @@ -944,7 +948,7 @@ public class StreamSync implements Serializable, Closeable { if (useRowWriter) { Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().orElseGet(() -> hoodieSparkContext.getSqlContext().emptyDataFrame()); HoodieWriteConfig hoodieWriteConfig = prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema()); - BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient); + BaseDatasetBulkInsertCommitActionExecutor executor = new HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, writeClient, instantTime); writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, !HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses()); } else { TypedProperties mergeProps = ConfigUtils.getMergeProps(props, metaClient.getTableConfig()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 8a0add29b516..ca3edb212195 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -406,13 +406,20 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath, String emptyBatchParam, boolean skipRecordKeyField) throws IOException { prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, - partitionPath, emptyBatchParam, null, skipRecordKeyField); + partitionPath, emptyBatchParam, null, skipRecordKeyField, false); + } + + protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, + String propsFileName, String parquetSourceRoot, boolean addCommonProps, + String partitionPath, String emptyBatchParam, boolean skipRecordKeyField, boolean useRowWriter) throws IOException { + prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps, + partitionPath, emptyBatchParam, null, skipRecordKeyField, useRowWriter); } protected void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile, String propsFileName, String parquetSourceRoot, boolean addCommonProps, String partitionPath, String emptyBatchParam, TypedProperties extraProps, - boolean skipRecordKeyField) throws IOException { + boolean skipRecordKeyField, boolean useRowWriter) throws IOException { // Properties used for testing delta-streamer with Parquet source TypedProperties parquetProps = TypedProperties.copy(extraProps); @@ -424,6 +431,9 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { parquetProps.setProperty("include", "base.properties"); parquetProps.setProperty("hoodie.embed.timeline.server", "false"); + if (useRowWriter) { + parquetProps.setProperty("hoodie.streamer.write.row.writer.enable", "true"); + } if (!skipRecordKeyField) { parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); } @@ -626,10 +636,18 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { tableType, sourceOrderingField, checkpoint, false); } + public static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, + List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, + String checkpoint, boolean allowCommitOnNoCheckpointChange) { + return makeConfig(basePath, op, sourceClassName, transformerClassNames, propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, updatePayloadClass, payloadClassName, + tableType, sourceOrderingField, checkpoint, allowCommitOnNoCheckpointChange, HoodieTableVersion.current()); + } + static HoodieDeltaStreamer.Config makeConfig(String basePath, WriteOperationType op, String sourceClassName, List<String> transformerClassNames, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType, String sourceOrderingField, - String checkpoint, boolean allowCommitOnNoCheckpointChange) { + String checkpoint, boolean allowCommitOnNoCheckpointChange, HoodieTableVersion tableVersion) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; @@ -652,7 +670,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { Triple<RecordMergeMode, String, String> mergeCfgs = HoodieTableConfig.inferMergingConfigsForWrites( cfg.recordMergeMode, cfg.payloadClassName, cfg.recordMergeStrategyId, cfg.sourceOrderingFields, - HoodieTableVersion.current()); + tableVersion); cfg.recordMergeMode = mergeCfgs.getLeft(); cfg.payloadClassName = mergeCfgs.getMiddle(); cfg.recordMergeStrategyId = mergeCfgs.getRight(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 4d6729bd2da4..42f5aab7559a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1163,7 +1163,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { extraProps.setProperty("hoodie.datasource.write.table.type", "MERGE_ON_READ"); extraProps.setProperty("hoodie.datasource.compaction.async.enable", "false"); prepareParquetDFSSource(false, false, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false); + PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false, false); String tableBasePath = basePath + "test_parquet_table" + testNum; HoodieDeltaStreamer.Config deltaCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), null, PROPS_FILENAME_TEST_PARQUET, false, @@ -1835,34 +1835,36 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { testBulkInsertRowWriterMultiBatches(true, null); } - @Test - public void testBulkInsertRowWriterWithSchemaProviderAndTransformer() throws Exception { - testBulkInsertRowWriterMultiBatches(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName())); + @ParameterizedTest + @EnumSource(value = HoodieTableVersion.class, names = {"SIX", "EIGHT", "NINE"}) + public void testBulkInsertRowWriterWithSchemaProviderAndTransformer(HoodieTableVersion tableVersion) throws Exception { + testBulkInsertRowWriterMultiBatches(true, Collections.singletonList(TripsWithDistanceTransformer.class.getName()), false, tableVersion); } @Test public void testBulkInsertRowWriterForEmptyBatch() throws Exception { - testBulkInsertRowWriterMultiBatches(false, null, true); + testBulkInsertRowWriterMultiBatches(false, null, true, HoodieTableVersion.current()); } private void testBulkInsertRowWriterMultiBatches(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception { - testBulkInsertRowWriterMultiBatches(useSchemaProvider, transformerClassNames, false); + testBulkInsertRowWriterMultiBatches(useSchemaProvider, transformerClassNames, false, HoodieTableVersion.current()); } - private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch) throws Exception { + private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch, HoodieTableVersion hoodieTableVersion) throws Exception { PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum; int parquetRecordsCount = 100; boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty(); prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null); prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false, "partition_path", ""); + PARQUET_SOURCE_ROOT, false, "partition_path", "", false, true); String tableBasePath = basePath + "/test_parquet_table" + testNum; HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT, testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName() : ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, - useSchemaProvider, 100000, false, null, null, "timestamp", null); + useSchemaProvider, 100000, false, null, null, "timestamp", null, false, hoodieTableVersion); cfg.configs.add(DataSourceWriteOptions.ENABLE_ROW_WRITER().key() + "=true"); + cfg.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + hoodieTableVersion.versionCode()); HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc); deltaStreamer.sync(); assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java index 620d9d23ac6b..d1caf5058ecb 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java @@ -215,7 +215,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase extends HoodieDeltaStrea transformerClassNames, PROPS_FILENAME_TEST_AVRO_KAFKA, false, useSchemaProvider, 100000, false, null, tableType, "timestamp", null); } else { prepareParquetDFSSource(false, hasTransformer, sourceSchemaFile, targetSchemaFile, PROPS_FILENAME_TEST_PARQUET, - PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false); + PARQUET_SOURCE_ROOT, false, "partition_path", "", extraProps, false, false); cfg = TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, ParquetDFSSource.class.getName(), transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false, useSchemaProvider, dfsSourceLimitBytes, false, null, tableType, "timestamp", null);
