nsivabalan commented on code in PR #9203:
URL: https://github.com/apache/hudi/pull/9203#discussion_r1267546687
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -642,11 +642,11 @@ object DataSourceWriteOptions {
val DROP_PARTITION_COLUMNS: ConfigProperty[java.lang.Boolean] =
HoodieTableConfig.DROP_PARTITION_COLUMNS
val ENABLE_OPTIMIZED_SQL_WRITES: ConfigProperty[String] = ConfigProperty
- .key("hoodie.spark.sql.writes.optimized.enable")
+ .key("hoodie.spark.sql.optimized.writes.enable")
Review Comment:
generally we try to align the var naming to the key.
Lets name the variable
SPARK_SQL_OPTIMIZED_WRITES
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");
+ public static final String WRITE_PREPPED_MERGE_KEY =
"_hoodie.datasource.merge.into.prepped";
Review Comment:
can we add java docs calling out the purpose of this
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala:
##########
@@ -22,122 +22,128 @@ import org.apache.hudi.{HoodieSparkUtils,
ScalaAssertionSupport}
class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase
with ScalaAssertionSupport {
test("Test Merge into extra cond") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ Seq(true, false).foreach { optimizedSqlEnabled =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- val tableName2 = generateTableName
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName2'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(
- s"""
- |insert into $tableName values
- | (1, 'a1', 10, 100),
- | (2, 'a2', 20, 200),
- | (3, 'a3', 20, 100)
- |""".stripMargin)
- spark.sql(
- s"""
- |insert into $tableName2 values
- | (1, 'u1', 10, 999),
- | (3, 'u3', 30, 9999),
- | (4, 'u4', 40, 99999)
- |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'a1', 10, 100),
+ | (2, 'a2', 20, 200),
+ | (3, 'a3', 20, 100)
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName2 values
+ | (1, 'u1', 10, 999),
+ | (3, 'u3', 30, 9999),
+ | (4, 'u4', 40, 99999)
+ |""".stripMargin)
- spark.sql(
- s"""
- |merge into $tableName as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched and oldData.price = $tableName2.price then update set
oldData.name = $tableName2.name
- |
- |""".stripMargin)
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
hoodie.spark.sql.optimized.merge.enable=$optimizedSqlEnabled")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "u1", 10.0, 100),
- Seq(3, "a3", 20.0, 100),
- Seq(2, "a2", 20.0, 200)
- )
+ spark.sql(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched and oldData.price = $tableName2.price then update
set oldData.name = $tableName2.name
+ |
+ |""".stripMargin)
- val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
- "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
- "columns are allowed on tables with primary key. (illegal column(s)
used: `price`"
- } else {
- "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
- "columns are allowed on tables with primary key. (illegal column(s)
used: `price`;"
- }
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "u1", 10.0, 100),
+ Seq(3, "a3", 20.0, 100),
+ Seq(2, "a2", 20.0, 200)
+ )
- checkException(
- s"""
- |merge into $tableName as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage)
+ val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Only simple conditions of the form `t.id = s.id` using primary key
or partition path " +
+ "columns are allowed on tables with primary key. (illegal
column(s) used: `price`"
+ } else {
+ "Only simple conditions of the form `t.id = s.id` using primary key
or partition path " +
+ "columns are allowed on tables with primary key. (illegal
column(s) used: `price`;"
+ }
- //test with multiple pks
- val tableName3 = generateTableName
- spark.sql(
- s"""
- |create table $tableName3 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName3'
- | tblproperties (
- | primaryKey ='id,name',
- | preCombineField = 'ts'
- | )
+ checkException(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage)
+
+ //test with multiple pks
+ val tableName3 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName3 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName3'
+ | tblproperties (
+ | primaryKey ='id,name',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
- "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found"
- } else {
- "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found;"
- }
+ val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found"
+ } else {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found;"
+ }
- checkException(
- s"""
- |merge into $tableName3 as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage2)
+ checkException(
+ s"""
+ |merge into $tableName3 as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage2)
+ }
}
}
test("Test pkless complex merge cond") {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
+ spark.sql("set hoodie.spark.sql.optimized.merge.enable=true")
Review Comment:
don't we need to fix all these?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
- val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
- optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue().toString))
- .equalsIgnoreCase("true")) {
+ val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
"false").toBoolean || optParams.getOrDefault(WRITE_PREPPED_MERGE_KEY,
"false").toBoolean) {
Review Comment:
and the config key be
_hoodie.spark.sql.writes.prepped
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");
+ public static final String WRITE_PREPPED_MERGE_KEY =
"_hoodie.datasource.merge.into.prepped";
Review Comment:
lets name the variable
SPARK_SQL_MERGE_INTO_PREPPED
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -146,9 +144,7 @@ class DefaultSource extends RelationProvider
mode: SaveMode,
optParams: Map[String, String],
rawDf: DataFrame): BaseRelation = {
- val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
- optParams.getOrDefault(SQL_MERGE_INTO_WRITES.key(),
SQL_MERGE_INTO_WRITES.defaultValue().toString))
- .equalsIgnoreCase("true")) {
+ val df = if (optParams.getOrDefault(DATASOURCE_WRITE_PREPPED_KEY,
"false").toBoolean || optParams.getOrDefault(WRITE_PREPPED_MERGE_KEY,
"false").toBoolean) {
Review Comment:
lets also rename the variable
DATASOURCE_WRITE_PREPPED_KEY to
SPARK_SQL_WRITE_PREPPED_KEY
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -721,6 +721,8 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");
+ public static final String WRITE_PREPPED_MERGE_KEY =
"_hoodie.datasource.merge.into.prepped";
Review Comment:
sorry. lets name this
_hoodie.spark.sql.merge.into.prepped
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]