This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 65b0589100 [spark] Fix merge into update columns detection (#7868)
65b0589100 is described below

commit 65b0589100a0ff20a21529c606983a1d1c0aab55
Author: YeJunHao <[email protected]>
AuthorDate: Mon May 18 15:36:49 2026 +0800

    [spark] Fix merge into update columns detection (#7868)
    
    This updates Spark merge-into data evolution update column detection so
    target self-assignments are not treated as modified columns when Spark
    adds or changes qualifiers. The logic now treats matching
    AttributeReference exprIds as the same target column, while still
    including source-side assignments such as target_col = source_col.
---
 .../action/DataEvolutionMergeIntoActionITCase.java |  1 +
 .../MergeIntoPaimonDataEvolutionTable.scala        | 30 ++++++++---
 .../MergeIntoPaimonDataEvolutionTableTest.scala    | 59 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
index 96edb8d75a..53ecb5b034 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -735,6 +735,7 @@ public class DataEvolutionMergeIntoActionITCase extends 
ActionITCaseBase {
                                 put(DATA_EVOLUTION_ENABLED.key(), "true");
                                 put("blob-field", "picture");
                                 put(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), 
"1 b");
+                                put("sink.parallelism", "1");
                             }
                         }));
         insertInto(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 6920b44015..96f8c0c5cc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -86,9 +86,8 @@ case class MergeIntoPaimonDataEvolutionTable(
       action match {
         case updateAction: UpdateAction =>
           for (assignment <- updateAction.assignments) {
-            if (!assignment.key.equals(assignment.value)) {
-              val key = assignment.key.asInstanceOf[AttributeReference]
-              columns ++= Seq(key)
+            if (isModifiedAssignment(assignment)) {
+              columns += assignmentKeyAttribute(assignment)
             }
           }
       }
@@ -281,9 +280,7 @@ case class MergeIntoPaimonDataEvolutionTable(
           UpdateAction.apply(
             update.condition,
             update.assignments.filter(
-              a =>
-                updateColumnsSorted.contains(
-                  a.key.asInstanceOf[AttributeReference])) ++ assignments))
+              a => updateColumnsSorted.contains(assignmentKeyAttribute(a))) ++ 
assignments))
 
     for (action <- realUpdateActions) {
       allFields ++= action.references.flatMap(r => extractFields(r)).seq
@@ -618,6 +615,27 @@ object MergeIntoPaimonDataEvolutionTable {
   final private val ROW_ID_NAME = "_ROW_ID"
   final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID";
 
+  private[commands] def isModifiedAssignment(assignment: Assignment): Boolean 
= {
+    !sameAttributeReference(assignment.key, assignment.value)
+  }
+
+  private[commands] def assignmentKeyAttribute(assignment: Assignment): 
AttributeReference = {
+    assignment.key match {
+      case key: AttributeReference => key
+      case other =>
+        throw new UnsupportedOperationException(
+          s"Unsupported update assignment key: $other. Only top-level 
attributes are supported.")
+    }
+  }
+
+  private def sameAttributeReference(left: Expression, right: Expression): 
Boolean = {
+    (left, right) match {
+      case (leftAttr: AttributeReference, rightAttr: AttributeReference) =>
+        leftAttr.sameRef(rightAttr)
+      case _ => false
+    }
+  }
+
   private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: 
Long): Long = {
     if (indexed.isEmpty) {
       throw new IllegalArgumentException("The input sorted sequence is empty.")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala
new file mode 100644
index 0000000000..eb6373cfce
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTableTest.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GetStructField, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
StructType}
+
+class MergeIntoPaimonDataEvolutionTableTest extends SparkFunSuite {
+
+  test("update column detection ignores target self-assignment with different 
qualifiers") {
+    val targetPicture = AttributeReference("picture", BinaryType)()
+    val qualifiedTargetPicture = targetPicture.withQualifier(Seq("t"))
+
+    assert(!targetPicture.equals(qualifiedTargetPicture))
+    assert(targetPicture.sameRef(qualifiedTargetPicture))
+    assert(
+      !MergeIntoPaimonDataEvolutionTable.isModifiedAssignment(
+        Assignment(targetPicture, qualifiedTargetPicture)))
+  }
+
+  test("update column detection includes source assignment with same column 
name") {
+    val targetFileType = AttributeReference("file_type", StringType)()
+    val sourceFileType = AttributeReference("file_type", 
StringType)().withQualifier(Seq("s"))
+
+    assert(!targetFileType.sameRef(sourceFileType))
+    assert(
+      MergeIntoPaimonDataEvolutionTable.isModifiedAssignment(
+        Assignment(targetFileType, sourceFileType)))
+  }
+
+  test("update column detection rejects non top-level assignment key") {
+    val targetStruct =
+      AttributeReference("metadata", StructType(Seq(StructField("name", 
StringType))))()
+    val nestedKey = GetStructField(targetStruct, 0, Some("name"))
+
+    intercept[UnsupportedOperationException] {
+      MergeIntoPaimonDataEvolutionTable.assignmentKeyAttribute(
+        Assignment(nestedKey, Literal("new-name")))
+    }
+  }
+}

Reply via email to