This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f82023aecb5c3160520490b1146545574c6b188c
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Apr 11 21:20:07 2024 -0400

    [HUDI-7605] Allow merger strategy to be set in spark sql writer (#10999)
---
 .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala |  1 +
 .../apache/hudi/functional/TestMORDataSource.scala   | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7020781faf0..ad19ec48c7a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -302,6 +302,7 @@ class HoodieSparkSqlWriterInternal {
           .setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
           
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
           .setCommitTimezone(timelineTimeZone)
+          
.setRecordMergerStrategy(hoodieConfig.getStringOrDefault(DataSourceWriteOptions.RECORD_MERGER_STRATEGY))
           .initTable(sparkContext.hadoopConfiguration, path)
       }
       val instantTime = HoodieActiveTimeline.createNewInstantTime()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 45bd3c645d4..b878eb76c40 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -1403,4 +1403,24 @@ class TestMORDataSource extends 
HoodieSparkClientTestBase with SparkDatasetMixin
       basePath
     }
   }
+
+  @Test
+  def testMergerStrategySet(): Unit = {
+    val (writeOpts, _) = getWriterReaderOpts()
+    val input = recordsToStrings(dataGen.generateInserts("000", 1)).asScala
+    val inputDf= spark.read.json(spark.sparkContext.parallelize(input, 1))
+    val mergerStrategyName = "example_merger_strategy"
+    inputDf.write.format("hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, "MERGE_ON_READ")
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key(), 
mergerStrategyName)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(spark.sessionState.newHadoopConf)
+      .build()
+    assertEquals(metaClient.getTableConfig.getRecordMergerStrategy, 
mergerStrategyName)
+  }
 }

Reply via email to