This is an automated email from the ASF dual-hosted git repository.

aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 732bff8267 [bug][spark] fix ambiguous __paimon_file_path when merging 
from paimon table (#5026)
732bff8267 is described below

commit 732bff826741054c2ff8bb34b267c58a067678ae
Author: WenjunMin <[email protected]>
AuthorDate: Fri Feb 28 17:46:48 2025 +0800

    [bug][spark] fix ambiguous __paimon_file_path when merging from paimon 
table (#5026)
---
 .../spark/commands/MergeIntoPaimonTable.scala      | 11 +++++++----
 .../paimon/spark/commands/PaimonCommand.scala      |  5 +++--
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  | 22 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 52e704172f..3df95917ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -152,13 +152,16 @@ case class MergeIntoPaimonTable(
       }
       if (hasUpdate(matchedActions)) {
         touchedFilePathsSet ++= findTouchedFiles(
-          targetDS.join(sourceDS, toColumn(mergeCondition), "inner"),
-          sparkSession)
+          targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
"inner"),
+          sparkSession,
+          "_left." + FILE_PATH_COLUMN
+        )
       }
       if (hasUpdate(notMatchedBySourceActions)) {
         touchedFilePathsSet ++= findTouchedFiles(
-          targetDS.join(sourceDS, toColumn(mergeCondition), "left_anti"),
-          sparkSession)
+          targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
"left_anti"),
+          sparkSession,
+          "_left." + FILE_PATH_COLUMN)
       }
 
       val targetFilePaths: Array[String] = findTouchedFiles(targetDS, 
sparkSession)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index d41fd7d4d2..b83831ba21 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -127,10 +127,11 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
 
   protected def findTouchedFiles(
       dataset: Dataset[Row],
-      sparkSession: SparkSession): Array[String] = {
+      sparkSession: SparkSession,
+      identifier: String = FILE_PATH_COLUMN): Array[String] = {
     import sparkSession.implicits._
     dataset
-      .select(FILE_PATH_COLUMN)
+      .select(identifier)
       .distinct()
       .as[String]
       .collect()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index bcd84fdc11..291945a055 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -48,6 +48,28 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase with PaimonTab
     }
   }
 
+  test(s"Paimon MergeInto: two paimon table") {
+    withTable("source", "target") {
+      createTable("target", "a INT, b INT, c STRING", Seq("a"))
+      createTable("source", "a INT, b INT, c STRING", Seq("a"))
+
+      spark.sql("INSERT INTO source values (1, 100, 'c11'), (3, 300, 'c33')")
+      spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")
+
+      spark.sql(s"""
+                   |MERGE INTO target
+                   |USING source
+                   |ON target.a = source.a
+                   |WHEN MATCHED THEN
+                   |UPDATE SET a = source.a, b = source.b, c = source.c
+                   |""".stripMargin)
+
+      checkAnswer(
+        spark.sql("SELECT * FROM target ORDER BY a, b"),
+        Row(1, 100, "c11") :: Row(2, 20, "c2") :: Nil)
+    }
+  }
+
   test(s"Paimon MergeInto: only delete") {
     withTable("source", "target") {
 

Reply via email to