This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 0c5ee4f8844 [HUDI-7807] Fixing spark-sql for pk less tables (#11354)
0c5ee4f8844 is described below

commit 0c5ee4f88448e7394dd35d465a12a6ee518067a9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 29 09:52:30 2024 -0700

    [HUDI-7807] Fixing spark-sql for pk less tables (#11354)
---
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |  4 +-
 .../factory/HoodieSparkKeyGeneratorFactory.java    |  3 +
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  4 +-
 .../spark/sql/hudi/dml/TestDeleteTable.scala       | 16 +++-
 .../spark/sql/hudi/dml/TestUpdateTable.scala       | 91 ++++++++++++----------
 5 files changed, 69 insertions(+), 49 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 4d7c83a7794..34af55fd85a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -268,6 +268,8 @@ public class KeyGenUtils {
    * @return true if record keys need to be auto generated. false otherwise.
    */
   public static boolean isAutoGeneratedRecordKeysEnabled(TypedProperties 
props) {
-    return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+    return !props.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())
+        || 
props.getProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()).equals(StringUtils.EMPTY_STRING);
+    // spark-sql sets record key config to empty string for update, and couple 
of other statements.
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index c655bf62543..2b3315fefb4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -88,6 +88,9 @@ public class HoodieSparkKeyGeneratorFactory {
         //Need to prevent overwriting the keygen for spark sql merge into 
because we need to extract
         //the recordkey from the meta cols if it exists. Sql keygen will use 
pkless keygen if needed.
         && !props.getBoolean(SPARK_SQL_MERGE_INTO_PREPPED_KEY, false);
+    if (autoRecordKeyGen) {
+      props.remove(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key());
+    }
     KeyGenerator keyGenerator = (KeyGenerator) 
ReflectionUtils.loadClass(keyGeneratorClass, props);
     if (autoRecordKeyGen) {
       return new AutoRecordGenWrapperKeyGenerator(props, (BuiltinKeyGenerator) 
keyGenerator);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 5b9b57cf10c..1a8031b9fe2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -228,8 +228,8 @@ class HoodieSparkSqlWriterInternal {
       originKeyGeneratorClassName, paramsWithoutDefaults)
 
     // Validate datasource and tableconfig keygen are the same
-    validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
-    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode 
== SaveMode.Overwrite);
+    validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode 
== SaveMode.Overwrite)
 
     asyncCompactionTriggerFnDefined = 
streamingWritesParamsOpt.map(_.asyncCompactionTriggerFn.isDefined).orElse(Some(false)).get
     asyncClusteringTriggerFnDefined = 
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.isDefined).orElse(Some(false)).get
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
index b9cafb6ec07..c157091d94d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDeleteTable.scala
@@ -80,28 +80,35 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
   test("Test Delete Table Without Primary Key") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
+        Seq (true, false).foreach { isPartitioned =>
         val tableName = generateTableName
+        val partitionedClause = if (isPartitioned) {
+          "PARTITIONED BY (name)"
+        } else {
+          ""
+        }
         // create table
         spark.sql(
           s"""
              |create table $tableName (
              |  id int,
-             |  name string,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  name string
              |) using hudi
              | location '${tmp.getCanonicalPath}/$tableName'
              | tblproperties (
              |  type = '$tableType',
              |  preCombineField = 'ts'
              | )
+             | $partitionedClause
    """.stripMargin)
 
         // test with optimized sql writes enabled.
         spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
 
         // insert data to table
-        spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+        spark.sql(s"insert into $tableName select 1, 10, 1000, 'a1'")
         checkAnswer(s"select id, name, price, ts from $tableName")(
           Seq(1, "a1", 10.0, 1000)
         )
@@ -112,7 +119,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
           Seq(0)
         )
 
-        spark.sql(s"insert into $tableName select 2, 'a2', 10, 1000")
+        spark.sql(s"insert into $tableName select 2, 10, 1000, 'a2'")
         spark.sql(s"delete from $tableName where id = 1")
         checkAnswer(s"select id, name, price, ts from $tableName")(
           Seq(2, "a2", 10.0, 1000)
@@ -124,6 +131,7 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
         )
       }
     }
+    }
   }
 
   test("Test Delete Table On Non-PK Condition") {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
index 8bdfe258bb7..5162b664880 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
@@ -77,54 +77,61 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
   test("Test Update Table Without Primary Key") {
     withRecordType()(withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
-        val tableName = generateTableName
-        // create table
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '${tmp.getCanonicalPath}/$tableName'
-             | tblproperties (
-             |  type = '$tableType',
-             |  preCombineField = 'ts'
-             | )
-   """.stripMargin)
-
-        // insert data to table
-        spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
+        Seq(true, false).foreach { isPartitioned =>
+          val tableName = generateTableName
+          val partitionedClause = if (isPartitioned) {
+            "PARTITIONED BY (name)"
+          } else {
+            ""
+          }
+          // create table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  price double,
+               |  ts long,
+               |  name string
+               |) using hudi
+               | location '${tmp.getCanonicalPath}/$tableName'
+               | tblproperties (
+               |  type = '$tableType',
+               |  preCombineField = 'ts'
+               | )
+               | $partitionedClause
+            """.stripMargin)
 
-        // test with optimized sql writes enabled.
-        spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
+          // insert data to table
+          spark.sql(s"insert into $tableName select 1,10, 1000, 'a1'")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 10.0, 1000)
+          )
 
-        // update data
-        spark.sql(s"update $tableName set price = 20 where id = 1")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 20.0, 1000)
-        )
+          // test with optimized sql writes enabled.
+          spark.sql(s"set ${SPARK_SQL_OPTIMIZED_WRITES.key()}=true")
 
-        // update data
-        spark.sql(s"update $tableName set price = price * 2 where id = 1")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 40.0, 1000)
-        )
+          // update data
+          spark.sql(s"update $tableName set price = 20 where id = 1")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 20.0, 1000)
+          )
 
-        // verify default compaction w/ MOR
-        if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-          spark.sql(s"update $tableName set price = price * 2 where id = 1")
-          spark.sql(s"update $tableName set price = price * 2 where id = 1")
+          // update data
           spark.sql(s"update $tableName set price = price * 2 where id = 1")
-          // verify compaction is complete
-          val metaClient = createMetaClient(spark, tmp.getCanonicalPath + "/" 
+ tableName)
-          
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction,
 "commit")
-        }
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 40.0, 1000)
+          )
 
+          // verify default compaction w/ MOR
+          if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+            spark.sql(s"update $tableName set price = price * 2 where id = 1")
+            spark.sql(s"update $tableName set price = price * 2 where id = 1")
+            spark.sql(s"update $tableName set price = price * 2 where id = 1")
+            // verify compaction is complete
+            val metaClient = createMetaClient(spark, tmp.getCanonicalPath + 
"/" + tableName)
+            
assertEquals(metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getLeft.getAction,
 "commit")
+          }
+        }
       }
     })
   }

Reply via email to