This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 05e8aaabbc [spark] Fix the case that exists filter expression in 
merge-into on clause (#7334)
05e8aaabbc is described below

commit 05e8aaabbcddade29388a2197e9d81ea76520ff5
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Mar 3 13:12:09 2026 +0800

    [spark] Fix the case that exists filter expression in merge-into on clause 
(#7334)
    
    Fix this
    ```sql
    MERGE INTO target tgt
    USING (
      SELECT a, b
      FROM source
      WHERE c = 'c11'
    ) AS src
    ON tgt.a = src.a AND tgt.b = src.b AND tgt.c = 'cc'
    WHEN MATCHED THEN DELETE
    ```
    
    ```
    Column '_left.__paimon_file_path' does not exist. Did you mean one of the 
following? [_left.a, _left.b, _left.c, src.a, src.b]; line 2 pos 0;
    'Project ['_left.__paimon_file_path]
    +- Join Inner, (((a#56 = a#59) AND (b#57 = b#60)) AND (c#58 = cc))
       :- SubqueryAlias _left
       :  +- Filter (c#58 = cc)
       :     +- SubqueryAlias tgt
       :        +- SubqueryAlias paimon.test.target
       :           +- RelationV2[a#56, b#57, c#58] test.target
       +- SubqueryAlias src
          +- Project [a#59, b#60]
             +- Filter (c#61 = c11)
                +- SubqueryAlias paimon.test.source
                   +- RelationV2[a#59, b#60, c#61] test.source
    ```
---
 .../spark/commands/MergeIntoPaimonTable.scala      |  8 +++++++-
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  | 23 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 491e980d0b..254c7afc8b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -142,9 +142,15 @@ case class MergeIntoPaimonTable(
         }
       }
 
+      // If there is filter, we need to output the __paimon__file_path 
metadata column explicitly.
+      val targetDSWithFilePathCol = targetOnlyCondition.fold(targetDS) {
+        condition =>
+          createDataset(sparkSession, Filter.apply(condition, 
relation.withMetadataColumns()))
+      }
+
       def findTouchedFiles0(joinType: String): Array[String] = {
         findTouchedFiles(
-          targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition), 
joinType),
+          targetDSWithFilePathCol.alias("_left").join(sourceDS, 
toColumn(mergeCondition), joinType),
           sparkSession,
           "_left." + FILE_PATH_COLUMN)
       }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index 28e1b63fd2..ea56f5539e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -665,6 +665,29 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase with PaimonTab
     }
   }
 
+  test(s"Paimon MergeInto: on clause has filter expression") {
+    withTable("source", "target") {
+      createTable("target", "a INT, b INT, c STRING", Seq("a"))
+      createTable("source", "a INT, b INT, c STRING", Seq("a"))
+
+      sql("INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c11'), (5, 
500, 'c55')")
+      sql("INSERT INTO target VALUES (1, 100, 'cc'), (2, 20, 'cc')")
+
+      sql("""
+            |MERGE INTO target tgt
+            |USING (
+            |  SELECT a, b
+            |  FROM source
+            |  WHERE c = 'c11'
+            |) AS src
+            |ON tgt.a = src.a AND tgt.b = src.b AND tgt.c = 'cc'
+            |WHEN MATCHED THEN DELETE
+            |""".stripMargin)
+
+      checkAnswer(sql("SELECT * FROM target ORDER BY a, b"), Row(2, 20, "cc") 
:: Nil)
+    }
+  }
+
   test(s"Paimon MergeInto: merge into with varchar") {
     withTable("source", "target") {
       createTable("source", "a INT, b VARCHAR(32)", Seq("a"))

Reply via email to