This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b7a6656ca [HUDI-5171] Ensure validateTableConfig also checks for
partition path field value switch (#7163)
0b7a6656ca is described below
commit 0b7a6656ca83c257fa65bee48a09037306766a5d
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Nov 10 13:39:06 2022 -0500
[HUDI-5171] Ensure validateTableConfig also checks for partition path field
value switch (#7163)
Co-authored-by: Jonathan Vexler <=>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 +--
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 7 +++++++
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 23 ++++++++++++++++++++++
.../apache/spark/sql/hudi/TestRepairTable.scala | 4 ++--
4 files changed, 33 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7bbc64782c..1248d93d58 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -85,14 +85,13 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path,
hoodieTableConfigOpt)
- validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite)
-
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode)
val originKeyGeneratorClassName =
HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs =
extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
//validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
+ validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite);
val databaseName =
hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 54bed6de7b..0f97fe3f04 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -159,6 +159,13 @@ object HoodieWriterUtils {
&& datasourceKeyGen != tableConfigKeyGen) {
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
}
+
+ val datasourcePartitionFields =
params.getOrElse(PARTITIONPATH_FIELD.key(), null)
+ val tableConfigPartitionFields =
tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
+ if (null != datasourcePartitionFields && null !=
tableConfigPartitionFields
+ && datasourcePartitionFields != tableConfigPartitionFields) {
+
diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n")
+ }
}
if (diffConfigs.nonEmpty) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 34961829db..86009b42e5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -312,6 +312,29 @@ class TestHoodieSparkSqlWriter {
assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite,
tableModifier2, dataFrame2)._1)
}
+ /**
+ * Test case for do not let the parttitonpath field change
+ */
+ @Test
+ def testChangePartitionPath(): Unit = {
+ //create a new table
+ val tableModifier1 = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.datasource.write.partitionpath.field" -> "ts")
+ val dataFrame =
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new
Date().getTime)))
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1,
dataFrame)
+
+ //on same path try write with different partitionpath field and Append
SaveMode should throw an exception
+ val tableModifier2 = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "hoodie.datasource.write.recordkey.field" -> "uuid",
"hoodie.datasource.write.partitionpath.field" -> "uuid")
+ val dataFrame2 =
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new
Date().getTime)))
+ val hoodieException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, tableModifier2, dataFrame2))
+ assert(hoodieException.getMessage.contains("Config conflict"))
+ assert(hoodieException.getMessage.contains(s"PartitionPath:\tuuid\tts"))
+
+ //on same path try write with different partitionpath and Overwrite
SaveMode should be successful.
+ assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite,
tableModifier2, dataFrame2)._1)
+ }
+
/**
* Test case for each bulk insert sort mode
*
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
index 10c2b443c0..1a04c54fe5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
@@ -85,7 +85,7 @@ class TestRepairTable extends HoodieSparkSqlTestBase {
df.write.format("hudi")
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
- .option(PARTITIONPATH_FIELD.key, "dt, hh")
+ .option(PARTITIONPATH_FIELD.key, "dt,hh")
.option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable)
.mode(SaveMode.Append)
.save(basePath)
@@ -111,7 +111,7 @@ class TestRepairTable extends HoodieSparkSqlTestBase {
.option(TBL_NAME.key(), tableName)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "ts")
- .option(PARTITIONPATH_FIELD.key, "dt, hh")
+ .option(PARTITIONPATH_FIELD.key, "dt,hh")
.option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable)
.mode(SaveMode.Append)
.save(basePath)