This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 922ec95239 [spark] Refactor 
UpdatePaimonTableCommand.writeUpdatedAndUnchangedData for row tracking
922ec95239 is described below

commit 922ec95239a91ac98019819e881064c7d454ce1f
Author: JingsongLi <[email protected]>
AuthorDate: Fri Oct 10 12:04:50 2025 +0800

    [spark] Refactor UpdatePaimonTableCommand.writeUpdatedAndUnchangedData for 
row tracking
---
 .../spark/commands/UpdatePaimonTableCommand.scala  | 39 +++++++++++-----------
 .../paimon/spark/sql/RowTrackingTestBase.scala     |  6 ----
 2 files changed, 20 insertions(+), 25 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index dc94777134..75749b5714 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -143,31 +143,32 @@ case class UpdatePaimonTableCommand(
   private def writeUpdatedAndUnchangedData(
       sparkSession: SparkSession,
       toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
+
+    def rowIdCol = col(ROW_ID_COLUMN)
+
+    def sequenceNumberCol = toColumn(
+      optimizedIf(
+        condition,
+        Literal(null),
+        toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
+      .as(SEQUENCE_NUMBER_COLUMN)
+
     var updateColumns = updateExpressions.zip(relation.output).map {
-      case (_, origin) if origin.name == ROW_ID_COLUMN =>
-        col(ROW_ID_COLUMN)
-      case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN =>
-        toColumn(
-          optimizedIf(
-            condition,
-            Literal(null),
-            toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
-          .as(SEQUENCE_NUMBER_COLUMN)
+      case (_, origin) if origin.name == ROW_ID_COLUMN => rowIdCol
+      case (_, origin) if origin.name == SEQUENCE_NUMBER_COLUMN => 
sequenceNumberCol
       case (update, origin) =>
         val updated = optimizedIf(condition, update, origin)
         toColumn(updated).as(origin.name, origin.metadata)
     }
 
-    if (coreOptions.rowTrackingEnabled() && !relation.outputSet.exists(_.name 
== ROW_ID_COLUMN)) {
-      updateColumns ++= Seq(
-        col(ROW_ID_COLUMN),
-        toColumn(
-          optimizedIf(
-            condition,
-            Literal(null),
-            toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
-          .as(SEQUENCE_NUMBER_COLUMN)
-      )
+    if (coreOptions.rowTrackingEnabled()) {
+      val outputSet = relation.outputSet
+      if (!outputSet.exists(_.name == ROW_ID_COLUMN)) {
+        updateColumns ++= Seq(rowIdCol)
+      }
+      if (!outputSet.exists(_.name == SEQUENCE_NUMBER_COLUMN)) {
+        updateColumns ++= Seq(sequenceNumberCol)
+      }
     }
 
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 09fa495fd9..422b51a10e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -103,12 +103,6 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
         sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
         Seq(Row(1, 1, 0, 1), Row(2, 222, 1, 3), Row(3, 3, 2, 1))
       )
-
-      sql("UPDATE t SET data = 111 WHERE _SEQUENCE_NUMBER = 1")
-      checkAnswer(
-        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
-        Seq(Row(1, 111, 0, 4), Row(2, 222, 1, 3), Row(3, 111, 2, 4))
-      )
     }
   }
 

Reply via email to