This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4437495d39b [HUDI-8083] Read precombine value from both table write 
config (#11777)
4437495d39b is described below

commit 4437495d39ba8a96328e023bc9ce6c08d03ffa5e
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Sep 5 18:26:38 2024 -0700

    [HUDI-8083] Read precombine value from both table write config (#11777)
    
    Co-authored-by: Shawn Chang <[email protected]>
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  3 +-
 .../spark/sql/hudi/TestProvidesHoodieConfig.scala  | 76 ++++++++++++++++++++--
 2 files changed, 73 insertions(+), 6 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 976f8496415..29918e2df70 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -191,7 +191,8 @@ trait ProvidesHoodieConfig extends Logging {
     // NOTE: Here we fallback to "" to make sure that null value is not 
overridden with
     // default value ("ts")
     // TODO(HUDI-3456) clean up
-    val preCombineField = combinedOpts.getOrElse(PRECOMBINE_FIELD.key, "")
+    val preCombineField = 
combinedOpts.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key,
+      combinedOpts.getOrElse(PRECOMBINE_FIELD.key, ""))
 
     val hiveStylePartitioningEnable = 
Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
     val urlEncodePartitioning = 
Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
index a5572998409..d7e061ed47c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
@@ -19,13 +19,19 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.hive.HiveSyncConfig
 import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator}
+import org.apache.spark.sql.{RuntimeConfig, SQLContext, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.internal.{SQLConf, SessionState, StaticSQLConf}
+import org.apache.spark.sql.types.StructType
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
-import org.mockito.Mockito
-import org.mockito.Mockito.when
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, spy, when}
 
 /**
  * Tests {@link ProvidesHoodieConfig}
@@ -33,7 +39,7 @@ import org.mockito.Mockito.when
 class TestProvidesHoodieConfig {
   @Test
   def testGetPartitionPathFieldWriteConfig(): Unit = {
-    val mockTable = Mockito.mock(classOf[HoodieCatalogTable])
+    val mockTable = mock(classOf[HoodieCatalogTable])
     val partitionFieldNames = "ts,segment"
     val customKeyGenPartitionFieldWriteConfig = "ts:timestamp,segment:simple"
 
@@ -66,10 +72,70 @@ class TestProvidesHoodieConfig {
         classOf[CustomKeyGenerator].getName, partitionFieldNames, mockTable))
   }
 
+  @Test
+  def testInferPrecombineFieldFromTableConfig(): Unit = {
+    // ProvidesHoodieConfig should be able to infer precombine field from 
table config
+    // mock catalogTable
+    val mockCatalog = mock(classOf[HoodieCatalogTable])
+    // catalogProperties won't be passed in correctly, because they were not 
synced properly
+    when(mockCatalog.catalogProperties).thenReturn(Map.empty[String, String])
+    when(mockCatalog.partitionFields).thenReturn(Array("partition"))
+    when(mockCatalog.preCombineKey).thenCallRealMethod()
+    when(mockCatalog.partitionSchema).thenReturn(StructType(Nil))
+    when(mockCatalog.primaryKeys).thenReturn(Array("key"))
+    when(mockCatalog.tableName).thenReturn("hudi_table")
+    val props = new TypedProperties()
+    props.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key, "segment")
+    val mockTableConfig = spy(classOf[HoodieTableConfig])
+    when(mockTableConfig.getProps).thenReturn(props)
+    when(mockCatalog.tableConfig).thenReturn(mockTableConfig)
+
+    // mock spark session and sqlConf
+    val mockSparkSession = mock(classOf[SparkSession])
+    val mockSessionState = mock(classOf[SessionState])
+    val mockRuntimeConf = mock(classOf[RuntimeConfig])
+    val mockSQLConf = mock(classOf[SQLConf])
+    val mockSQLContext = mock(classOf[SQLContext])
+    when(mockSparkSession.sqlContext).thenReturn(mockSQLContext)
+    when(mockSparkSession.sessionState).thenReturn(mockSessionState)
+    when(mockRuntimeConf.getOption(any())).thenReturn(Option.empty)
+    when(mockSparkSession.conf).thenReturn(mockRuntimeConf)
+    when(mockSessionState.conf).thenReturn(mockSQLConf)
+    when(mockSQLContext.conf).thenReturn(mockSQLConf)
+    
when(mockSQLConf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)).thenReturn("nothive")
+    when(mockSQLConf.getAllConfs).thenReturn(Map.empty[String, String])
+
+    val mockCmd = mock(classOf[ProvidesHoodieConfig])
+    when(mockCmd.buildHiveSyncConfig(any(), any(), any(), any()))
+      .thenReturn(new HiveSyncConfig(new TypedProperties()))
+    when(mockCmd.getDropDupsConfig(any(), any())).thenReturn(Map.empty[String, 
String])
+    when(mockCmd.buildHoodieInsertConfig(any(), any(), any(), any(), any(), 
any(), any()))
+      .thenCallRealMethod()
+    val combinedConfig = mockCmd.buildHoodieInsertConfig(
+      mockCatalog,
+      mockSparkSession,
+      isOverwritePartition = false,
+      isOverwriteTable = false,
+      Map.empty,
+      Map.empty,
+      Option.empty)
+
+    assertEquals(
+      "segment",
+      combinedConfig.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
+    )
+
+    // write config precombine field should be inferred from table config
+    assertEquals(
+      "segment",
+      combinedConfig.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "")
+    )
+  }
+
   private def mockPartitionWriteConfigInCatalogProps(mockTable: 
HoodieCatalogTable,
                                                      value: Option[String]): 
Unit = {
     val props = if (value.isDefined) {
-      Map(PARTITIONPATH_FIELD.key() -> value.get)
+      Map(DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> value.get)
     } else {
       Map[String, String]()
     }

Reply via email to