This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a56297799e0 [HUDI-6660] For merge into use primary key constraint
when optimized writes are enabled (#9382)
a56297799e0 is described below
commit a56297799e0eebbff8159a989d6cd9acb0308c2d
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Aug 8 04:49:47 2023 +0530
[HUDI-6660] For merge into use primary key constraint when optimized
writes are enabled (#9382)
- [HUDI-6660] Relax primary key constraint for merge into join condition
when optimized writes are enabled
---
.../hudi/command/MergeIntoHoodieTableCommand.scala | 3 +-
.../TestMergeIntoTableWithNonRecordKeyField.scala | 56 +++++++++++++++++-----
2 files changed, 46 insertions(+), 13 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f830c552bc8..accacd82662 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -172,7 +172,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
expressionSet.remove((attr, expr))
(attr, expr)
}
- if (resolving.isEmpty && rk._1.equals("primaryKey")) {
+ if (resolving.isEmpty && rk._1.equals("primaryKey")
+ &&
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(),
"false") == "true") {
throw new AnalysisException(s"Hudi tables with primary key are
required to match on all primary key colums. Column: '${rk._2}' not found")
}
resolving
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
index 48964b37323..bcb29809eba 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTableWithNonRecordKeyField.scala
@@ -123,20 +123,52 @@ class TestMergeIntoTableWithNonRecordKeyField extends
HoodieSparkSqlTestBase wit
| )
""".stripMargin)
- val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
- "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found"
+ spark.sql(
+ s"""
+ |insert into $tableName3 values
+ | (1, 'a1', 10, 100),
+ | (2, 'a2', 20, 200),
+ | (3, 'u3', 20, 100)
+ |""".stripMargin)
+
+ if (sparkSqlOptimizedWrites) {
+ val errorMessage2 = if (HoodieSparkUtils.gteqSpark3_1) {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found"
+ } else {
+ "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found;"
+ }
+ checkException(
+ s"""
+ | merge into $tableName3 as t0
+ | using (
+ | select * from $tableName2
+ | ) as s0
+ | on t0.id = s0.id
+ | when matched then update set id = t0.id, name = t0.name,
+ | price = t0.price, ts = s0.ts
+ | when not matched then insert (id,name,price,ts) values(s0.id,
s0.name, s0.price, s0.ts)
+ """.stripMargin)(errorMessage2)
} else {
- "Hudi tables with primary key are required to match on all primary
key colums. Column: 'name' not found;"
+ spark.sql(
+ s"""
+ | merge into $tableName3 as t0
+ | using (
+ | select * from $tableName2
+ | ) as s0
+ | on t0.id = s0.id
+ | when matched then update set id = t0.id, name = t0.name,
+ | price = t0.price, ts = s0.ts
+ | when not matched then insert (id,name,price,ts) values(s0.id,
s0.name, s0.price, s0.ts)
+ """.stripMargin
+ )
+ checkAnswer(s"select id, name, price, ts from $tableName3")(
+ Seq(1, "a1", 10.0, 100),
+ Seq(1, "u1", 10.0, 999),
+ Seq(2, "a2", 20.0, 200),
+ Seq(3, "u3", 20.0, 9999),
+ Seq(4, "u4", 40.0, 99999)
+ )
}
-
- checkException(
- s"""
- |merge into $tableName3 as oldData
- |using $tableName2
- |on oldData.id = $tableName2.id
- |when matched then update set oldData.name = $tableName2.name
- |when not matched then insert *
- |""".stripMargin)(errorMessage2)
}
}
}