This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 05e8aaabbc [spark] Fix the case that exists filter expression in
merge-into on clause (#7334)
05e8aaabbc is described below
commit 05e8aaabbcddade29388a2197e9d81ea76520ff5
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Mar 3 13:12:09 2026 +0800
[spark] Fix the case that exists filter expression in merge-into on clause
(#7334)
Fix this
```sql
MERGE INTO target tgt
USING (
SELECT a, b
FROM source
WHERE c = 'c11'
) AS src
ON tgt.a = src.a AND tgt.b = src.b AND tgt.c = 'cc'
WHEN MATCHED THEN DELETE
```
```
Column '_left.__paimon_file_path' does not exist. Did you mean one of the
following? [_left.a, _left.b, _left.c, src.a, src.b]; line 2 pos 0;
'Project ['_left.__paimon_file_path]
+- Join Inner, (((a#56 = a#59) AND (b#57 = b#60)) AND (c#58 = cc))
:- SubqueryAlias _left
: +- Filter (c#58 = cc)
: +- SubqueryAlias tgt
: +- SubqueryAlias paimon.test.target
: +- RelationV2[a#56, b#57, c#58] test.target
+- SubqueryAlias src
+- Project [a#59, b#60]
+- Filter (c#61 = c11)
+- SubqueryAlias paimon.test.source
+- RelationV2[a#59, b#60, c#61] test.source
```
---
.../spark/commands/MergeIntoPaimonTable.scala | 8 +++++++-
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 23 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 491e980d0b..254c7afc8b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -142,9 +142,15 @@ case class MergeIntoPaimonTable(
}
}
+ // If there is filter, we need to output the __paimon__file_path
metadata column explicitly.
+ val targetDSWithFilePathCol = targetOnlyCondition.fold(targetDS) {
+ condition =>
+ createDataset(sparkSession, Filter.apply(condition,
relation.withMetadataColumns()))
+ }
+
def findTouchedFiles0(joinType: String): Array[String] = {
findTouchedFiles(
- targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
joinType),
+ targetDSWithFilePathCol.alias("_left").join(sourceDS,
toColumn(mergeCondition), joinType),
sparkSession,
"_left." + FILE_PATH_COLUMN)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index 28e1b63fd2..ea56f5539e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -665,6 +665,29 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase with PaimonTab
}
}
+ test(s"Paimon MergeInto: on clause has filter expression") {
+ withTable("source", "target") {
+ createTable("target", "a INT, b INT, c STRING", Seq("a"))
+ createTable("source", "a INT, b INT, c STRING", Seq("a"))
+
+ sql("INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c11'), (5,
500, 'c55')")
+ sql("INSERT INTO target VALUES (1, 100, 'cc'), (2, 20, 'cc')")
+
+ sql("""
+ |MERGE INTO target tgt
+ |USING (
+ | SELECT a, b
+ | FROM source
+ | WHERE c = 'c11'
+ |) AS src
+ |ON tgt.a = src.a AND tgt.b = src.b AND tgt.c = 'cc'
+ |WHEN MATCHED THEN DELETE
+ |""".stripMargin)
+
+ checkAnswer(sql("SELECT * FROM target ORDER BY a, b"), Row(2, 20, "cc")
:: Nil)
+ }
+ }
+
test(s"Paimon MergeInto: merge into with varchar") {
withTable("source", "target") {
createTable("source", "a INT, b VARCHAR(32)", Seq("a"))