This is an automated email from the ASF dual-hosted git repository.

zhiwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a94043  [HUDI-2343]Fix the exception for mergeInto when the 
primaryKey and preCombineField of source table and target table differ in case 
only (#3517)
5a94043 is described below

commit 5a94043f38fa8e23c8e0b184225230aa43873368
Author: 董可伦 <[email protected]>
AuthorDate: Tue Sep 21 22:11:52 2021 +0800

    [HUDI-2343]Fix the exception for mergeInto when the primaryKey and 
preCombineField of source table and target table differ in case only (#3517)
---
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  6 +-
 .../spark/sql/hudi/TestMergeIntoTable2.scala       | 69 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index c4b9aec..b22c607 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -196,9 +196,11 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Runnab
   }
 
   private def isEqualToTarget(targetColumnName: String, sourceExpression: 
Expression): Boolean = {
+    val sourceColNameMap = sourceDFOutput.map(attr => (attr.name.toLowerCase, 
attr.name)).toMap
+
     sourceExpression match {
-      case attr: AttributeReference if 
attr.name.equalsIgnoreCase(targetColumnName) => true
-      case Cast(attr: AttributeReference, _, _) if 
attr.name.equalsIgnoreCase(targetColumnName) => true
+      case attr: AttributeReference if 
sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true
+      case Cast(attr: AttributeReference, _, _) if 
sourceColNameMap(attr.name.toLowerCase).equals(targetColumnName) => true
       case _=> false
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index 92a2c63..30a2448 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -375,4 +375,73 @@ class TestMergeIntoTable2 extends TestHoodieSqlBase {
     }
   }
 
+  test("Test MergeInto When PrimaryKey And PreCombineField Of Source Table And 
Target Table Differ In Case Only") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | options (
+           |  primaryKey ='id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as ID, 'a1' as NAME, 10 as PRICE, 1000 as TS, '1' as 
FLAG
+           | ) s0
+           | on s0.ID = $tableName.id
+           | when matched and FLAG = '1' then update set
+           | id = s0.ID, name = s0.NAME, price = s0.PRICE, ts = s0.TS
+           | when not matched and FLAG = '1' then insert *
+           |""".stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 10.0, 1000)
+      )
+
+      // Test the case of the column names of condition and action is 
different from that of source table
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 1 as ID, 'a1' as NAME, 11 as PRICE, 1001 as TS, '1' as 
FLAG
+           | ) s0
+           | on s0.id = $tableName.id
+           | when matched and FLAG = '1' then update set
+           | id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts
+           | when not matched and FLAG = '1' then insert *
+           |""".stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 11.0, 1001)
+      )
+
+      // Test the case of the column names of cast condition is different from 
that of source table
+      spark.sql(
+        s"""
+           | merge into $tableName
+           | using (
+           |  select 2 as ID, 'a2' as NAME, 12 as PRICE, 1002 as TS, '1' as 
FLAG
+           | ) s0
+           | on cast(s0.id as int) = $tableName.id
+           | when matched and FLAG = '1' then update set
+           | id = s0.id, name = s0.NAME, price = s0.PRICE, ts = s0.ts
+           | when not matched and FLAG = '1' then insert *
+           |""".stripMargin)
+      checkAnswer(s"select id, name, price, ts from $tableName")(
+        Seq(1, "a1", 11.0, 1001),
+        Seq(2, "a2", 12.0, 1002)
+      )
+    }
+  }
+
 }

Reply via email to