This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 0c5ee4f8844 [HUDI-7807] Fixing spark-sql for pk less tables (#11354)
0c5ee4f8844 is described below
commit 0c5ee4f88448e7394dd35d465a12a6ee518067a9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 29 09:52:30 2024 -0700
[HUDI-7807] Fixing spark-sql for pk less tables (#11354)
---
.../java/org/apache/hudi/keygen/KeyGenUtils.java | 4 +-
.../factory/HoodieSparkKeyGeneratorFactory.java | 3 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 4 +-
.../spark/sql/hudi/dml/TestDeleteTable.scala | 16 +++-
.../spark/sql/hudi/dml/TestUpdateTable.scala | 91 ++++++++++++----------
5 files changed, 69 insertions(+), 49 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 4d7c83a7794..34af55fd85a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -268,6 +268,8 @@ public class KeyGenUtils {
* @return true if record keys need to be auto generated. false otherwise.
*/
public static boolean isAutoGeneratedRecordKeysEnabled(TypedProperties
props) {
- return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+ return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+ ||
props.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).equals(StringUtils.EMPTY_STRING);
+ // spark-sql sets record key config to empty string for update, and couple
of other statements.
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index c655bf62543..2b3315fefb4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -88,6 +88,9 @@ public class HoodieSparkKeyGeneratorFactory {
//Need to prevent overwriting the keygen for spark sql merge into
because we need to extract
//the recordkey from the meta cols if it exists. Sql keygen will use
pkless keygen if needed.
&& !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
+ if (autoRecordKeyGen) {
+ props.remove(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+ }
KeyGenerator keyGenerator = (KeyGenerator)
ReflectionUtils.loadClass(keyGeneratorClass, props);
if (autoRecordKeyGen) {
return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator)
keyGenerator);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 5b9b57cf10c..1a8031b9fe2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -228,8 +228,8 @@ class HoodieSparkSqlWriterInternal {
originKeyGeneratorClassName, paramsWithoutDefaults)
// Validate datasource and tableconfig keygen are the same
- validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
- validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite);
+ validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig)
+ validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite)
asyncCompactionTriggerFnDefined =
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
asyncClusteringTriggerFnDefined =
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
index b9cafb6ec07..c157091d94d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
@@ -80,28 +80,35 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
test("Test Delete Table Without Primary Key") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
+ Seq (true, false).foreach { isPartitioned =>
val tableName = generateTableName
+ val partitionedClause = if (isPartitioned) {
+ "PARTITIONED BY (name)"
+ } else {
+ ""
+ }
// create table
spark.sql(
s"""
|create table $tableName (
| id int,
- | name string,
| price double,
- | ts long
+ | ts long,
+ | name string
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = '$tableType',
| preCombineField = 'ts'
| )
+ | $partitionedClause
""".stripMargin)
// test with optimized sql writes enabled.
spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
// insert data to table
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 1, 10, 1000, 'a1'")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(1, "a1", 10.0, 1000)
)
@@ -112,7 +119,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
Seq(0)
)
- spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 10, 1000, 'a2'")
spark.sql(s"delete from $tableName where id = 1")
checkAnswer(s"select id, name, price, ts from $tableName")(
Seq(2, "a2", 10.0, 1000)
@@ -124,6 +131,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
)
}
}
+ }
}
test("Test Delete Table On Non-PK Condition") {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
index 8bdfe258bb7..5162b664880 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
@@ -77,54 +77,61 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
test("Test Update Table Without Primary Key") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- val tableName = generateTableName
- // create table
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | type = '$tableType',
- | preCombineField = 'ts'
- | )
- """.stripMargin)
-
- // insert data to table
- spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
+ Seq(true, false).foreach { isPartitioned =>
+ val tableName = generateTableName
+ val partitionedClause = if (isPartitioned) {
+ "PARTITIONED BY (name)"
+ } else {
+ ""
+ }
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | price double,
+ | ts long,
+ | name string
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | type = '$tableType',
+ | preCombineField = 'ts'
+ | )
+ | $partitionedClause
+ """.stripMargin)
- // test with optimized sql writes enabled.
- spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1,10, 1000, 'a1'")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
- // update data
- spark.sql(s"update $tableName set price = 20 where id = 1")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 20.0, 1000)
- )
+ // test with optimized sql writes enabled.
+ spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
- // update data
- spark.sql(s"update $tableName set price = price * 2 where id = 1")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 40.0, 1000)
- )
+ // update data
+ spark.sql(s"update $tableName set price = 20 where id = 1")
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 20.0, 1000)
+ )
- // verify default compaction w/ MOR
- if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
- spark.sql(s"update $tableName set price = price * 2 where id = 1")
- spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ // update data
spark.sql(s"update $tableName set price = price * 2 where id = 1")
- // verify compaction is complete
- val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/"
+ tableName)
-
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction,
"commit")
- }
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 40.0, 1000)
+ )
+ // verify default compaction w/ MOR
+ if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+ spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ spark.sql(s"update $tableName set price = price * 2 where id = 1")
+ // verify compaction is complete
+ val metaClient = createMetaClient(spark, tmp.getCanonicalPath +
"/" + tableName)
+
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction,
"commit")
+ }
+ }
}
})
}