This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a5f4e8f1894e33dfa66318867b4fbb11e429dfe6 Author: Jon Vexler <[email protected]> AuthorDate: Thu Apr 11 21:20:07 2024 -0400 [HUDI-7605] Allow merger strategy to be set in spark sql writer (#10999) --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 1 + .../apache/hudi/functional/TestMORDataSource.scala | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7020781faf0..ad19ec48c7a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -302,6 +302,7 @@ class HoodieSparkSqlWriterInternal { .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile) .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(timelineTimeZone) + .setRecordMergerStrategy(hoodieConfig.getStringOrDefault(DataSourceWriteOptions.RECORD_MERGER_STRATEGY)) .initTable(sparkContext.hadoopConfiguration, path) } val instantTime = HoodieActiveTimeline.createNewInstantTime() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 45bd3c645d4..b878eb76c40 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -1403,4 +1403,24 @@ class TestMORDataSource extends HoodieSparkClientTestBase with SparkDatasetMixin basePath } } + + @Test + def testMergerStrategySet(): Unit = { + val (writeOpts, _) = getWriterReaderOpts() + val input = recordsToStrings(dataGen.generateInserts("000", 1)).asScala + val inputDf= spark.read.json(spark.sparkContext.parallelize(input, 1)) + val mergerStrategyName = "example_merger_strategy" + inputDf.write.format("hudi") + .options(writeOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, "MERGE_ON_READ") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), mergerStrategyName) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.builder() + .setBasePath(basePath) + .setConf(spark.sessionState.newHadoopConf) + .build() + assertEquals(metaClient.getTableConfig.getRecordMergerStrategy, mergerStrategyName) + } }
