amrishlal commented on code in PR #9203:
URL: https://github.com/apache/hudi/pull/9203#discussion_r1267608249
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala:
##########
@@ -22,122 +22,128 @@ import org.apache.hudi.{HoodieSparkUtils,
ScalaAssertionSupport}
class TestMergeIntoTableWithNonRecordKeyField extends HoodieSparkSqlTestBase
with ScalaAssertionSupport {
test("Test Merge into extra cond") {
- withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ Seq(true, false).foreach { optimizedSqlEnabled =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- val tableName2 = generateTableName
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName2'
- | tblproperties (
- | primaryKey ='id',
- | preCombineField = 'ts'
- | )
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName2'
+ | tblproperties (
+ | primaryKey ='id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(
- s"""
- |insert into $tableName values
- | (1, 'a1', 10, 100),
- | (2, 'a2', 20, 200),
- | (3, 'a3', 20, 100)
- |""".stripMargin)
- spark.sql(
- s"""
- |insert into $tableName2 values
- | (1, 'u1', 10, 999),
- | (3, 'u3', 30, 9999),
- | (4, 'u4', 40, 99999)
- |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName values
+ | (1, 'a1', 10, 100),
+ | (2, 'a2', 20, 200),
+ | (3, 'a3', 20, 100)
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName2 values
+ | (1, 'u1', 10, 999),
+ | (3, 'u3', 30, 9999),
+ | (4, 'u4', 40, 99999)
+ |""".stripMargin)
- spark.sql(
- s"""
- |merge into $tableName as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched and oldData.price = $tableName2.price then update set
oldData.name = $tableName2.name
- |
- |""".stripMargin)
+ // test with optimized sql merge enabled / disabled.
+ spark.sql(s"set
hoodie.spark.sql.optimized.merge.enable=$optimizedSqlEnabled")
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "u1", 10.0, 100),
- Seq(3, "a3", 20.0, 100),
- Seq(2, "a2", 20.0, 200)
- )
+ spark.sql(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched and oldData.price = $tableName2.price then update
set oldData.name = $tableName2.name
+ |
+ |""".stripMargin)
- val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
- "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
- "columns are allowed on tables with primary key. (illegal column(s)
used: `price`"
- } else {
- "Only simple conditions of the form `t.id = s.id` using primary key or
partition path " +
- "columns are allowed on tables with primary key. (illegal column(s)
used: `price`;"
- }
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "u1", 10.0, 100),
+ Seq(3, "a3", 20.0, 100),
+ Seq(2, "a2", 20.0, 200)
+ )
- checkException(
- s"""
- |merge into $tableName as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage)
+ val errorMessage = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Only simple conditions of the form `t.id = s.id` using primary key
or partition path " +
+ "columns are allowed on tables with primary key. (illegal
column(s) used: `price`"
+ } else {
+ "Only simple conditions of the form `t.id = s.id` using primary key
or partition path " +
+ "columns are allowed on tables with primary key. (illegal
column(s) used: `price`;"
+ }
- //test with multiple pks
- val tableName3 = generateTableName
- spark.sql(
- s"""
- |create table $tableName3 (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '${tmp.getCanonicalPath}/$tableName3'
- | tblproperties (
- | primaryKey ='id,name',
- | preCombineField = 'ts'
- | )
+ checkException(
+ s"""
+ |merge into $tableName as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id and oldData.price =
$tableName2.price
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage)
+
+ //test with multiple pks
+ val tableName3 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName3 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName3'
+ | tblproperties (
+ | primaryKey ='id,name',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
- "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found"
- } else {
- "Hudi tables with primary key are required to match on all primary key
colums. Column: 'name' not found;"
- }
+ val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found"
+ } else {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found;"
+ }
- checkException(
- s"""
- |merge into $tableName3 as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage2)
+ checkException(
+ s"""
+ |merge into $tableName3 as oldData
+ |using $tableName2
+ |on oldData.id = $tableName2.id
+ |when matched then update set oldData.name = $tableName2.name
+ |when not matched then insert *
+ |""".stripMargin)(errorMessage2)
+ }
}
}
test("Test pkless complex merge cond") {
withRecordType()(withTempDir { tmp =>
spark.sql("set hoodie.payload.combined.schema.validate = true")
+ spark.sql("set hoodie.spark.sql.optimized.merge.enable=true")
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]