This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bf015c71fc [spark] Fix duplicate column error when merging on _ROW_ID
(#6727)
bf015c71fc is described below
commit bf015c71fc2cdad17f946d8127f3ec07f97616ec
Author: wayneli-vt <[email protected]>
AuthorDate: Wed Dec 3 14:55:35 2025 +0800
[spark] Fix duplicate column error when merging on _ROW_ID (#6727)
---
.../MergeIntoPaimonDataEvolutionTable.scala | 19 +++++++++----
.../paimon/spark/sql/RowTrackingTestBase.scala | 33 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 11cd875413..768e244a38 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -198,8 +198,16 @@ case class MergeIntoPaimonDataEvolutionTable(
val updateColumnsSorted = updateColumns.toSeq.sortBy(
s => targetTable.output.map(x => x.toString()).indexOf(s.toString()))
- val assignments = redundantColumns.map(column => Assignment(column,
column))
- val output = updateColumnsSorted ++ redundantColumns
+ // Different Spark versions might produce duplicate attributes between
`output` and
+ // `metadataOutput`, so manually deduplicate by `exprId`.
+ val metadataColumns = (targetRelation.output ++
targetRelation.metadataOutput)
+ .filter(attr => attr.name.equals(ROW_ID_NAME))
+ .groupBy(_.exprId)
+ .map { case (_, attrs) => attrs.head }
+ .toSeq
+
+ val assignments = metadataColumns.map(column => Assignment(column, column))
+ val output = updateColumnsSorted ++ metadataColumns
val realUpdateActions = matchedActions
.map(s => s.asInstanceOf[UpdateAction])
.map(
@@ -217,10 +225,9 @@ case class MergeIntoPaimonDataEvolutionTable(
val allReadFieldsOnTarget = allFields.filter(
field =>
- targetTable.output.exists(
- attr => attr.toString().equals(field.toString()))) ++
redundantColumns
- val allReadFieldsOnSource = allFields.filter(
- field => sourceTable.output.exists(attr =>
attr.toString().equals(field.toString())))
+ targetTable.output.exists(attr => attr.exprId.equals(field.exprId)))
++ metadataColumns
+ val allReadFieldsOnSource =
+ allFields.filter(field => sourceTable.output.exists(attr =>
attr.exprId.equals(field.exprId)))
val targetReadPlan =
touchedFileTargetRelation.copy(targetRelation.table,
allReadFieldsOnTarget.toSeq)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index cda852f3a4..5fb3de0e07 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -327,6 +327,39 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase {
}
}
+ test("Data Evolution: merge into table with data-evolution on _ROW_ID") {
+ withTable("source", "target") {
+ sql(
+ "CREATE TABLE source (a INT, b INT, c STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql(
+ "INSERT INTO source VALUES (1, 100, 'c11'), (3, 300, 'c33'), (5, 500,
'c55'), (7, 700, 'c77'), (9, 900, 'c99')")
+
+ sql(
+ "CREATE TABLE target (a INT, b INT, c STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
+ sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2'), (3, 30,
'c3')")
+
+ sql(s"""
+ |MERGE INTO target
+ |USING source
+ |ON target._ROW_ID = source._ROW_ID
+ |WHEN MATCHED AND target.a = 2 THEN UPDATE SET b = source.b +
target.b
+ |WHEN MATCHED AND source.c > 'c2' THEN UPDATE SET b = source.b, c
= source.c
+ |WHEN NOT MATCHED AND c > 'c9' THEN INSERT (a, b, c) VALUES (a, b
* 1.1, c)
+ |WHEN NOT MATCHED THEN INSERT (a, b, c) VALUES (a, b, c)
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM target ORDER BY a"),
+ Seq(
+ Row(1, 10, "c1", 0, 2),
+ Row(2, 320, "c2", 1, 2),
+ Row(3, 500, "c55", 2, 2),
+ Row(7, 700, "c77", 3, 2),
+ Row(9, 990, "c99", 4, 2))
+ )
+ }
+ }
+
test("Data Evolution: update table throws exception") {
withTable("t") {
sql(