nsivabalan commented on code in PR #9203:
URL: https://github.com/apache/hudi/pull/9203#discussion_r1267546687


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -642,11 +642,11 @@ object DataSourceWriteOptions {
   val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] = 
HoodieTableConfig.DROP_PARTITION_COLUMNS
 
   val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
-    .key("hoodie.spark.sql.writes.optimized.enable")
+    .key("hoodie.spark.sql.optimized.writes.enable")

Review Comment:
   generally we try to align the var naming to the key. 
   Lets name the variable 
   SPARK_SQL_OPTIMIZED_WRITES
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
           + "By default, no Hudi client init callback is executed.");
 
+  public static final String WRITE_PREPPED_MERGE_KEY = 
"_hoodie.datasource.merge.into.prepped";

Review Comment:
   can we add java docs calling out the purpose of this



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala:
##########
@@ -22,122 +22,128 @@ import org.apache.hudi.{HoodieSparkUtils, 
ScalaAssertionSupport}
 class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase 
with ScalaAssertionSupport {
 
   test("Test Merge into extra cond") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName'
-           | tblproperties (
-           |  primaryKey ='id',
-           |  preCombineField = 'ts'
-           | )
+    Seq(true, false).foreach { optimizedSqlEnabled =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | tblproperties (
+             |  primaryKey ='id',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
-      val tableName2 = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName2 (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName2'
-           | tblproperties (
-           |  primaryKey ='id',
-           |  preCombineField = 'ts'
-           | )
+        val tableName2 = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName2 (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName2'
+             | tblproperties (
+             |  primaryKey ='id',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
 
-      spark.sql(
-        s"""
-           |insert into $tableName values
-           |    (1, 'a1', 10, 100),
-           |    (2, 'a2', 20, 200),
-           |    (3, 'a3', 20, 100)
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |insert into $tableName2 values
-           |    (1, 'u1', 10, 999),
-           |    (3, 'u3', 30, 9999),
-           |    (4, 'u4', 40, 99999)
-           |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName values
+             |    (1, 'a1', 10, 100),
+             |    (2, 'a2', 20, 200),
+             |    (3, 'a3', 20, 100)
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into $tableName2 values
+             |    (1, 'u1', 10, 999),
+             |    (3, 'u3', 30, 9999),
+             |    (4, 'u4', 40, 99999)
+             |""".stripMargin)
 
-      spark.sql(
-        s"""
-           |merge into $tableName as oldData
-           |using $tableName2
-           |on oldData.id = $tableName2.id
-           |when matched and oldData.price = $tableName2.price then update set 
oldData.name = $tableName2.name
-           |
-           |""".stripMargin)
+        // test with optimized sql merge enabled / disabled.
+        spark.sql(s"set 
hoodie.spark.sql.optimized.merge.enable=$optimizedSqlEnabled")
 
-      checkAnswer(s"select id, name, price, ts from $tableName")(
-        Seq(1, "u1", 10.0, 100),
-        Seq(3, "a3", 20.0, 100),
-        Seq(2, "a2", 20.0, 200)
-      )
+        spark.sql(
+          s"""
+             |merge into $tableName as oldData
+             |using $tableName2
+             |on oldData.id = $tableName2.id
+             |when matched and oldData.price = $tableName2.price then update 
set oldData.name = $tableName2.name
+             |
+             |""".stripMargin)
 
-      val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
-        "Only simple conditions of the form `t.id = s.id` using primary key or 
partition path " +
-          "columns are allowed on tables with primary key. (illegal column(s) 
used: `price`"
-      } else {
-        "Only simple conditions of the form `t.id = s.id` using primary key or 
partition path " +
-          "columns are allowed on tables with primary key. (illegal column(s) 
used: `price`;"
-      }
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "u1", 10.0, 100),
+          Seq(3, "a3", 20.0, 100),
+          Seq(2, "a2", 20.0, 200)
+        )
 
-      checkException(
-        s"""
-           |merge into $tableName as oldData
-           |using $tableName2
-           |on oldData.id = $tableName2.id and oldData.price = 
$tableName2.price
-           |when matched then update set oldData.name = $tableName2.name
-           |when not matched then insert *
-           |""".stripMargin)(errorMessage)
+        val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+          "Only simple conditions of the form `t.id = s.id` using primary key 
or partition path " +
+            "columns are allowed on tables with primary key. (illegal 
column(s) used: `price`"
+        } else {
+          "Only simple conditions of the form `t.id = s.id` using primary key 
or partition path " +
+            "columns are allowed on tables with primary key. (illegal 
column(s) used: `price`;"
+        }
 
-      //test with multiple pks
-      val tableName3 = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName3 (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | location '${tmp.getCanonicalPath}/$tableName3'
-           | tblproperties (
-           |  primaryKey ='id,name',
-           |  preCombineField = 'ts'
-           | )
+        checkException(
+          s"""
+             |merge into $tableName as oldData
+             |using $tableName2
+             |on oldData.id = $tableName2.id and oldData.price = 
$tableName2.price
+             |when matched then update set oldData.name = $tableName2.name
+             |when not matched then insert *
+             |""".stripMargin)(errorMessage)
+
+        //test with multiple pks
+        val tableName3 = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName3 (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName3'
+             | tblproperties (
+             |  primaryKey ='id,name',
+             |  preCombineField = 'ts'
+             | )
        """.stripMargin)
 
-      val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
-        "Hudi tables with primary key are required to match on all primary key 
colums. Column: 'name' not found"
-      } else {
-        "Hudi tables with primary key are required to match on all primary key 
colums. Column: 'name' not found;"
-      }
+        val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+          "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found"
+        } else {
+          "Hudi tables with primary key are required to match on all primary 
key colums. Column: 'name' not found;"
+        }
 
-      checkException(
-        s"""
-           |merge into $tableName3 as oldData
-           |using $tableName2
-           |on oldData.id = $tableName2.id
-           |when matched then update set oldData.name = $tableName2.name
-           |when not matched then insert *
-           |""".stripMargin)(errorMessage2)
+        checkException(
+          s"""
+             |merge into $tableName3 as oldData
+             |using $tableName2
+             |on oldData.id = $tableName2.id
+             |when matched then update set oldData.name = $tableName2.name
+             |when not matched then insert *
+             |""".stripMargin)(errorMessage2)
+      }
     }
   }
 
   test("Test pkless complex merge cond") {
     withRecordType()(withTempDir { tmp =>
       spark.sql("set hoodie.payload.combined.schema.validate = true")
+      spark.sql("set hoodie.spark.sql.optimized.merge.enable=true")

Review Comment:
   don't we need to fix all these? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               rawDf: DataFrame): BaseRelation = {
-    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
-      optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue().toString))
-      .equalsIgnoreCase("true")) {
+    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, 
"false").toBoolean || optParams.getOrDefault(WRITE_PREPPED_MERGE_KEY, 
"false").toBoolean) {

Review Comment:
   and the config key be 
   _hoodie.spark.sql.writes.prepped



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
           + "By default, no Hudi client init callback is executed.");
 
+  public static final String WRITE_PREPPED_MERGE_KEY = 
"_hoodie.datasource.merge.into.prepped";

Review Comment:
   lets name the variable 
   SPARK_SQL_MERGE_INTO_PREPPED



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               rawDf: DataFrame): BaseRelation = {
-    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
-      optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(), 
SQL_MERGE_INTO_WRITES.defaultValue().toString))
-      .equalsIgnoreCase("true")) {
+    val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY, 
"false").toBoolean || optParams.getOrDefault(WRITE_PREPPED_MERGE_KEY, 
"false").toBoolean) {

Review Comment:
   lets also rename the variable 
   DATASOURCE_WRITE_PREPPED_KEY to 
   SPARK_SQL_WRITE_PREPPED_KEY



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "The class must be a subclass of 
`org.apache.hudi.callback.HoodieClientInitCallback`."
           + "By default, no Hudi client init callback is executed.");
 
+  public static final String WRITE_PREPPED_MERGE_KEY = 
"_hoodie.datasource.merge.into.prepped";

Review Comment:
   sorry. lets name this
   _hoodie.spark.sql.merge.into.prepped



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to