This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4437495d39b [HUDI-8083] Read precombine value from both table write
config (#11777)
4437495d39b is described below
commit 4437495d39ba8a96328e023bc9ce6c08d03ffa5e
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Sep 5 18:26:38 2024 -0700
[HUDI-8083] Read precombine value from both table write config (#11777)
Co-authored-by: Shawn Chang <[email protected]>
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 3 +-
.../spark/sql/hudi/TestProvidesHoodieConfig.scala | 76 ++++++++++++++++++++--
2 files changed, 73 insertions(+), 6 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 976f8496415..29918e2df70 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -191,7 +191,8 @@ trait ProvidesHoodieConfig extends Logging {
// NOTE: Here we fallback to "" to make sure that null value is not
overridden with
// default value ("ts")
// TODO(HUDI-3456) clean up
- val preCombineField = combinedOpts.getOrElse(PRECOMBINE_FIELD.key, "")
+ val preCombineField =
combinedOpts.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key,
+ combinedOpts.getOrElse(PRECOMBINE_FIELD.key, ""))
val hiveStylePartitioningEnable =
Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true")
val urlEncodePartitioning =
Option(tableConfig.getUrlEncodePartitioning).getOrElse("false")
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
index a5572998409..d7e061ed47c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/hudi/TestProvidesHoodieConfig.scala
@@ -19,13 +19,19 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator}
+import org.apache.spark.sql.{RuntimeConfig, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.internal.{SQLConf, SessionState, StaticSQLConf}
+import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
-import org.mockito.Mockito
-import org.mockito.Mockito.when
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, spy, when}
/**
* Tests {@link ProvidesHoodieConfig}
@@ -33,7 +39,7 @@ import org.mockito.Mockito.when
class TestProvidesHoodieConfig {
@Test
def testGetPartitionPathFieldWriteConfig(): Unit = {
- val mockTable = Mockito.mock(classOf[HoodieCatalogTable])
+ val mockTable = mock(classOf[HoodieCatalogTable])
val partitionFieldNames = "ts,segment"
val customKeyGenPartitionFieldWriteConfig = "ts:timestamp,segment:simple"
@@ -66,10 +72,70 @@ class TestProvidesHoodieConfig {
classOf[CustomKeyGenerator].getName, partitionFieldNames, mockTable))
}
+ @Test
+ def testInferPrecombineFieldFromTableConfig(): Unit = {
+ // ProvidesHoodieConfig should be able to infer precombine field from
table config
+ // mock catalogTable
+ val mockCatalog = mock(classOf[HoodieCatalogTable])
+ // catalogProperties won't be passed in correctly, because they were not
synced properly
+ when(mockCatalog.catalogProperties).thenReturn(Map.empty[String, String])
+ when(mockCatalog.partitionFields).thenReturn(Array("partition"))
+ when(mockCatalog.preCombineKey).thenCallRealMethod()
+ when(mockCatalog.partitionSchema).thenReturn(StructType(Nil))
+ when(mockCatalog.primaryKeys).thenReturn(Array("key"))
+ when(mockCatalog.tableName).thenReturn("hudi_table")
+ val props = new TypedProperties()
+ props.setProperty(HoodieTableConfig.PRECOMBINE_FIELD.key, "segment")
+ val mockTableConfig = spy(classOf[HoodieTableConfig])
+ when(mockTableConfig.getProps).thenReturn(props)
+ when(mockCatalog.tableConfig).thenReturn(mockTableConfig)
+
+ // mock spark session and sqlConf
+ val mockSparkSession = mock(classOf[SparkSession])
+ val mockSessionState = mock(classOf[SessionState])
+ val mockRuntimeConf = mock(classOf[RuntimeConfig])
+ val mockSQLConf = mock(classOf[SQLConf])
+ val mockSQLContext = mock(classOf[SQLContext])
+ when(mockSparkSession.sqlContext).thenReturn(mockSQLContext)
+ when(mockSparkSession.sessionState).thenReturn(mockSessionState)
+ when(mockRuntimeConf.getOption(any())).thenReturn(Option.empty)
+ when(mockSparkSession.conf).thenReturn(mockRuntimeConf)
+ when(mockSessionState.conf).thenReturn(mockSQLConf)
+ when(mockSQLContext.conf).thenReturn(mockSQLConf)
+
when(mockSQLConf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)).thenReturn("nothive")
+ when(mockSQLConf.getAllConfs).thenReturn(Map.empty[String, String])
+
+ val mockCmd = mock(classOf[ProvidesHoodieConfig])
+ when(mockCmd.buildHiveSyncConfig(any(), any(), any(), any()))
+ .thenReturn(new HiveSyncConfig(new TypedProperties()))
+ when(mockCmd.getDropDupsConfig(any(), any())).thenReturn(Map.empty[String,
String])
+ when(mockCmd.buildHoodieInsertConfig(any(), any(), any(), any(), any(),
any(), any()))
+ .thenCallRealMethod()
+ val combinedConfig = mockCmd.buildHoodieInsertConfig(
+ mockCatalog,
+ mockSparkSession,
+ isOverwritePartition = false,
+ isOverwriteTable = false,
+ Map.empty,
+ Map.empty,
+ Option.empty)
+
+ assertEquals(
+ "segment",
+ combinedConfig.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
+ )
+
+ // write config precombine field should be inferred from table config
+ assertEquals(
+ "segment",
+ combinedConfig.getOrElse(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "")
+ )
+ }
+
private def mockPartitionWriteConfigInCatalogProps(mockTable:
HoodieCatalogTable,
value: Option[String]):
Unit = {
val props = if (value.isDefined) {
- Map(PARTITIONPATH_FIELD.key() -> value.get)
+ Map(DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> value.get)
} else {
Map[String, String]()
}