This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f98184f7bad [HUDI-7604] Automatically set hoodie.table.name based on
write config in Spark (#10998)
f98184f7bad is described below
commit f98184f7badc97f0b1f6297baa6858b2690f6c60
Author: Jon Vexler <[email protected]>
AuthorDate: Sun Sep 22 12:10:31 2024 -0400
[HUDI-7604] Automatically set hoodie.table.name based on write config in
Spark (#10998)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py | 4 ----
.../src/main/scala/org/apache/hudi/DataSourceOptions.scala | 7 ++++++-
.../test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 4 ----
.../apache/hudi/utilities/TestHoodieMetadataTableValidator.java | 4 ----
4 files changed, 6 insertions(+), 13 deletions(-)
diff --git
a/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
b/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
index caad76ca600..9505e321785 100644
---
a/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
+++
b/hudi-examples/hudi-examples-spark/src/test/python/HoodiePySparkQuickstart.py
@@ -34,7 +34,6 @@ class ExamplePySpark:
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
@@ -162,7 +161,6 @@ class ExamplePySpark:
'hoodie.table.name': self.tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': self.tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
@@ -197,7 +195,6 @@ class ExamplePySpark:
'hoodie.table.name': self.tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': self.tableName,
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
@@ -225,7 +222,6 @@ class ExamplePySpark:
'hoodie.table.name': self.tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
- 'hoodie.datasource.write.table.name': self.tableName,
'hoodie.datasource.write.operation': 'insert_overwrite',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index ae582ce2ce5..ef9ccc9aebc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -34,7 +34,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.{getKeyGeneratorClassNameFromType,
inferKeyGeneratorTypeFromWriteConfig}
import org.apache.hudi.keygen.{CustomKeyGenerator, NonpartitionedKeyGenerator,
SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
-import org.apache.hudi.util.JFunction
+import org.apache.hudi.util.{JFunction, SparkConfigUtils}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils =>
SparkDataSourceUtils}
import org.slf4j.LoggerFactory
@@ -977,6 +977,11 @@ object DataSourceOptionsHelper {
def translateConfigurations(optParams: Map[String, String]): Map[String,
String] = {
val translatedOpt = scala.collection.mutable.Map[String, String]() ++=
optParams
+ if (!SparkConfigUtils.containsConfigProperty(optParams,
HoodieTableConfig.NAME) &&
+ SparkConfigUtils.containsConfigProperty(optParams,
DataSourceWriteOptions.TABLE_NAME)) {
+ translatedOpt.put(HoodieTableConfig.NAME.key(),
+ SparkConfigUtils.getStringWithAltKeys(optParams,
DataSourceWriteOptions.TABLE_NAME))
+ }
optParams.keySet.foreach(opt => {
if (allAlternatives.contains(opt) &&
!optParams.contains(allAlternatives(opt))) {
log.warn(opt + " is deprecated and will be removed in a later release;
Please use " + allAlternatives(opt))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index c4b67799530..ec87dd74e88 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -728,7 +728,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partitionpath").
option("hoodie.schema.on.read.enable","true").
option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
- option("hoodie.table.name", tableName).
mode("overwrite").
save(tablePath)
@@ -748,7 +747,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
option("hoodie.schema.on.read.enable","true").
option("hoodie.datasource.write.reconcile.schema","true").
option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
- option("hoodie.table.name", tableName).
mode("append").
save(tablePath)
spark.sql("set hoodie.schema.on.read.enable=true")
@@ -780,7 +778,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
option("hoodie.schema.on.read.enable","true").
option("hoodie.datasource.write.reconcile.schema","true").
option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
- option("hoodie.table.name", tableName).
mode("append").
save(tablePath)
spark.read.format("hudi").load(tablePath).show(false)
@@ -797,7 +794,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
option("hoodie.schema.on.read.enable","true").
option("hoodie.datasource.write.reconcile.schema","true").
option(DataSourceWriteOptions.TABLE_NAME.key(), tableName).
- option("hoodie.table.name", tableName).
mode("append").
save(tablePath)
spark.read.format("hudi").load(tablePath).createOrReplaceTempView("hudi_trips_snapshot1")
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 0f4592cedf9..2181d7a8f12 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -102,7 +102,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
public void testMetadataTableValidation(String viewStorageTypeForFSListing,
String viewStorageTypeForMDTListing) {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
- writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
@@ -145,7 +144,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws
InterruptedException {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
- writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
@@ -219,7 +217,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
public void testAdditionalFilesinMetadata(Integer lastNFileSlices) throws
IOException {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
- writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
@@ -323,7 +320,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
public void testRliValidationFalsePositiveCase() throws IOException {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
- writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");