This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 65b0589100 [spark] Fix merge into update columns detection (#7868)
65b0589100 is described below
commit 65b0589100a0ff20a21529c606983a1d1c0aab55
Author: YeJunHao <[email protected]>
AuthorDate: Mon May 18 15:36:49 2026 +0800
[spark] Fix merge into update columns detection (#7868)
This updates Spark merge-into data evolution update column detection so
target self-assignments are not treated as modified columns when Spark
adds or changes qualifiers. The logic now treats matching
AttributeReference exprIds as the same target column, while still
including source-side assignments such as target_col = source_col.
---
.../action/DataEvolutionMergeIntoActionITCase.java | 1 +
.../MergeIntoPaimonDataEvolutionTable.scala | 30 ++++++++---
.../MergeIntoPaimonDataEvolutionTableTest.scala | 59 ++++++++++++++++++++++
3 files changed, 84 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 96edb8d75a..53ecb5b034 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -735,6 +735,7 @@ public class DataEvolutionMergeIntoActionITCase extends
ActionITCaseBase {
put(DATA_EVOLUTION_ENABLED.key(), "true");
put("blob-field", "picture");
put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(),
"1 b");
+ put("sink.parallelism", "1");
}
}));
insertInto(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 6920b44015..96f8c0c5cc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -86,9 +86,8 @@ case class MergeIntoPaimonDataEvolutionTable(
action match {
case updateAction: UpdateAction =>
for (assignment <- updateAction.assignments) {
- if (!assignment.key.equals(assignment.value)) {
- val key = assignment.key.asInstanceOf[AttributeReference]
- columns ++= Seq(key)
+ if (isModifiedAssignment(assignment)) {
+ columns += assignmentKeyAttribute(assignment)
}
}
}
@@ -281,9 +280,7 @@ case class MergeIntoPaimonDataEvolutionTable(
UpdateAction.apply(
update.condition,
update.assignments.filter(
- a =>
- updateColumnsSorted.contains(
- a.key.asInstanceOf[AttributeReference])) ++ assignments))
+ a => updateColumnsSorted.contains(assignmentKeyAttribute(a))) ++
assignments))
for (action <- realUpdateActions) {
allFields ++= action.references.flatMap(r => extractFields(r)).seq
@@ -618,6 +615,27 @@ object MergeIntoPaimonDataEvolutionTable {
final private val ROW_ID_NAME = "_ROW_ID"
final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID";
+ private[commands] def isModifiedAssignment(assignment: Assignment): Boolean
= {
+ !sameAttributeReference(assignment.key, assignment.value)
+ }
+
+ private[commands] def assignmentKeyAttribute(assignment: Assignment):
AttributeReference = {
+ assignment.key match {
+ case key: AttributeReference => key
+ case other =>
+ throw new UnsupportedOperationException(
+ s"Unsupported update assignment key: $other. Only top-level
attributes are supported.")
+ }
+ }
+
+ private def sameAttributeReference(left: Expression, right: Expression):
Boolean = {
+ (left, right) match {
+ case (leftAttr: AttributeReference, rightAttr: AttributeReference) =>
+ leftAttr.sameRef(rightAttr)
+ case _ => false
+ }
+ }
+
private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value:
Long): Long = {
if (indexed.isEmpty) {
throw new IllegalArgumentException("The input sorted sequence is empty.")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala
new file mode 100644
index 0000000000..eb6373cfce
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
GetStructField, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.types.{BinaryType, StringType, StructField,
StructType}
+
+class MergeIntoPaimonDataEvolutionTableTest extends SparkFunSuite {
+
+ test("update column detection ignores target self-assignment with different
qualifiers") {
+ val targetPicture = AttributeReference("picture", BinaryType)()
+ val qualifiedTargetPicture = targetPicture.withQualifier(Seq("t"))
+
+ assert(!targetPicture.equals(qualifiedTargetPicture))
+ assert(targetPicture.sameRef(qualifiedTargetPicture))
+ assert(
+ !MergeIntoPaimonDataEvolutionTable.isModifiedAssignment(
+ Assignment(targetPicture, qualifiedTargetPicture)))
+ }
+
+ test("update column detection includes source assignment with same column
name") {
+ val targetFileType = AttributeReference("file_type", StringType)()
+ val sourceFileType = AttributeReference("file_type",
StringType)().withQualifier(Seq("s"))
+
+ assert(!targetFileType.sameRef(sourceFileType))
+ assert(
+ MergeIntoPaimonDataEvolutionTable.isModifiedAssignment(
+ Assignment(targetFileType, sourceFileType)))
+ }
+
+ test("update column detection rejects non top-level assignment key") {
+ val targetStruct =
+ AttributeReference("metadata", StructType(Seq(StructField("name",
StringType))))()
+ val nestedKey = GetStructField(targetStruct, 0, Some("name"))
+
+ intercept[UnsupportedOperationException] {
+ MergeIntoPaimonDataEvolutionTable.assignmentKeyAttribute(
+ Assignment(nestedKey, Literal("new-name")))
+ }
+ }
+}