This is an automated email from the ASF dual-hosted git repository.
aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 732bff8267 [bug][spark] fix ambiguous __paimon_file_path when merging
from paimon table (#5026)
732bff8267 is described below
commit 732bff826741054c2ff8bb34b267c58a067678ae
Author: WenjunMin <[email protected]>
AuthorDate: Fri Feb 28 17:46:48 2025 +0800
[bug][spark] fix ambiguous __paimon_file_path when merging from paimon
table (#5026)
---
.../spark/commands/MergeIntoPaimonTable.scala | 11 +++++++----
.../paimon/spark/commands/PaimonCommand.scala | 5 +++--
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 22 ++++++++++++++++++++++
3 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 52e704172f..3df95917ab 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -152,13 +152,16 @@ case class MergeIntoPaimonTable(
}
if (hasUpdate(matchedActions)) {
touchedFilePathsSet ++= findTouchedFiles(
- targetDS.join(sourceDS, toColumn(mergeCondition), "inner"),
- sparkSession)
+ targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
"inner"),
+ sparkSession,
+ "_left." + FILE_PATH_COLUMN
+ )
}
if (hasUpdate(notMatchedBySourceActions)) {
touchedFilePathsSet ++= findTouchedFiles(
- targetDS.join(sourceDS, toColumn(mergeCondition), "left_anti"),
- sparkSession)
+ targetDS.alias("_left").join(sourceDS, toColumn(mergeCondition),
"left_anti"),
+ sparkSession,
+ "_left." + FILE_PATH_COLUMN)
}
val targetFilePaths: Array[String] = findTouchedFiles(targetDS,
sparkSession)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index d41fd7d4d2..b83831ba21 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -127,10 +127,11 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
protected def findTouchedFiles(
dataset: Dataset[Row],
- sparkSession: SparkSession): Array[String] = {
+ sparkSession: SparkSession,
+ identifier: String = FILE_PATH_COLUMN): Array[String] = {
import sparkSession.implicits._
dataset
- .select(FILE_PATH_COLUMN)
+ .select(identifier)
.distinct()
.as[String]
.collect()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index bcd84fdc11..291945a055 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -48,6 +48,28 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase with PaimonTab
}
}
+ test(s"Paimon MergeInto: two paimon table") {
+ withTable("source", "target") {
+ createTable("target", "a INT, b INT, c STRING", Seq("a"))
+ createTable("source", "a INT, b INT, c STRING", Seq("a"))
+
+ spark.sql("INSERT INTO source values (1, 100, 'c11'), (3, 300, 'c33')")
+ spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")
+
+ spark.sql(s"""
+ |MERGE INTO target
+ |USING source
+ |ON target.a = source.a
+ |WHEN MATCHED THEN
+ |UPDATE SET a = source.a, b = source.b, c = source.c
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target ORDER BY a, b"),
+ Row(1, 100, "c11") :: Row(2, 20, "c2") :: Nil)
+ }
+ }
+
test(s"Paimon MergeInto: only delete") {
withTable("source", "target") {