This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0824fe77b1 [spark] Support V2 DML for row-tracking append-only tables 
(#8094)
0824fe77b1 is described below

commit 0824fe77b16cc50a2988f1322b4a8a33a3691874
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jun 4 13:44:30 2026 +0800

    [spark] Support V2 DML for row-tracking append-only tables (#8094)
---
 .../apache/paimon/spark/sql/RowTrackingTest.scala  |  19 +++-
 .../apache/paimon/spark/sql/RowTrackingTest.scala  | 109 ++++++++++++++++++++-
 .../spark/schema/PaimonMetadataColumnBase.java}    |  23 ++++-
 .../write/PaimonV2MetadataAwareDataWriter.java     |  62 ++++++++++++
 .../scala/org/apache/paimon/spark/SparkTable.scala |  29 +++---
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  17 ++++
 .../spark/catalyst/analysis/RowLevelHelper.scala   |  18 +---
 .../rowops/PaimonSparkCopyOnWriteOperation.scala   |   9 +-
 .../paimon/spark/schema/PaimonMetadataColumn.scala |  17 +++-
 .../paimon/spark/write/PaimonBatchWriteBase.scala  |  44 +++++++--
 .../paimon/spark/write/PaimonV2DataWriter.scala    |  68 +++++++++++--
 .../paimon/spark/sql/RowTrackingTestBase.scala     |  15 +++
 .../catalyst/analysis/PureAppendOnlyScope.scala    |  50 ++++------
 .../analysis/Spark41DeleteMetadataRestore.scala    |  10 +-
 .../analysis/Spark41MergeIntoRewrite.scala         |  15 +--
 .../analysis/Spark41UpdateTableRewrite.scala       |   7 +-
 16 files changed, 413 insertions(+), 99 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index 9f96840a77..60bfd244b2 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,21 @@
 
 package org.apache.paimon.spark.sql
 
-class RowTrackingTest extends RowTrackingTestBase {}
+import org.apache.paimon.spark.SparkTable
+
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+
+class RowTrackingTest extends RowTrackingTestBase {
+
+  test("Row Tracking: Spark 3.5 keeps row-tracking tables on V1 DML path") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      withTable("t", "rt") {
+        sql("CREATE TABLE t (id INT, data INT)")
+        
assert(SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
+
+        sql("CREATE TABLE rt (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+        
assert(!SparkTable.of(loadTable("rt")).isInstanceOf[SupportsRowLevelOperations])
+      }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index 9f96840a77..382aa1e778 100644
--- 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
+++ 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,111 @@
 
 package org.apache.paimon.spark.sql
 
-class RowTrackingTest extends RowTrackingTestBase {}
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.schema.PaimonMetadataColumn
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.types.Metadata
+
+class RowTrackingTest extends RowTrackingTestBase {
+
+  test("Row Tracking: metadata columns expose Spark preserve flags") {
+    val rowIdMetadata = 
Metadata.fromJson(PaimonMetadataColumn.ROW_ID.metadataInJSON())
+    assert(rowIdMetadata.getBoolean("__preserve_on_delete"))
+    assert(rowIdMetadata.getBoolean("__preserve_on_update"))
+    assert(!rowIdMetadata.getBoolean("__preserve_on_reinsert"))
+
+    val sequenceNumberMetadata =
+      Metadata.fromJson(PaimonMetadataColumn.SEQUENCE_NUMBER.metadataInJSON())
+    assert(sequenceNumberMetadata.getBoolean("__preserve_on_delete"))
+    assert(!sequenceNumberMetadata.getBoolean("__preserve_on_update"))
+    assert(!sequenceNumberMetadata.getBoolean("__preserve_on_reinsert"))
+  }
+
+  test("Row Tracking: Spark 4.1 uses V2 copy-on-write for DML") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      withTable("s", "t") {
+        sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+        sql("INSERT INTO t VALUES (1, 1), (2, 2)")
+        sql("INSERT INTO t VALUES (3, 3), (4, 4)")
+
+        assertPlanContains("DELETE FROM t WHERE id = 2", "ReplaceData")
+        sql("DELETE FROM t WHERE id = 2")
+
+        assertPlanContains("UPDATE t SET data = 30 WHERE id = 3", 
"ReplaceData")
+        sql("UPDATE t SET data = 30 WHERE id = 3")
+
+        sql("CREATE TABLE s (id INT, data INT)")
+        sql("INSERT INTO s VALUES (3, 300), (5, 500)")
+        assertPlanContains(
+          """
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN UPDATE SET data = s.data
+            |WHEN NOT MATCHED THEN INSERT *
+            |""".stripMargin,
+          "ReplaceData"
+        )
+        sql("""
+              |MERGE INTO t
+              |USING s
+              |ON t.id = s.id
+              |WHEN MATCHED THEN UPDATE SET data = s.data
+              |WHEN NOT MATCHED THEN INSERT *
+              |""".stripMargin)
+
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(1, 1, 0, 1), Row(3, 300, 2, 5), Row(4, 4, 3, 2), Row(5, 500, 
4, 5))
+        )
+      }
+    }
+  }
+
+  test("Row Tracking: nested CHAR columns do not expose V2 row-level 
capability") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      withTable("t") {
+        sql("""
+              |CREATE TABLE t (
+              |  id INT,
+              |  info STRUCT<name: CHAR(3), data: INT>
+              |) TBLPROPERTIES ('row-tracking.enabled' = 'true')
+              |""".stripMargin)
+
+        
assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
+      }
+    }
+  }
+
+  test("Row Tracking: Spark 4.1 restores metadata-only delete fast path") {
+    withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+      withTable("t") {
+        sql("""
+              |CREATE TABLE t (id INT, data INT, dt STRING)
+              |PARTITIONED BY (dt)
+              |TBLPROPERTIES ('row-tracking.enabled' = 'true')
+              |""".stripMargin)
+        sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p1'), (3, 3, 'p2')")
+
+        assertPlanContains("DELETE FROM t WHERE dt = 'p1'", 
"DeleteFromPaimonTableCommand")
+        sql("DELETE FROM t WHERE dt = 'p1'")
+
+        checkAnswer(
+          sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+          Seq(Row(3, 3, "p2", 0, 1))
+        )
+      }
+    }
+  }
+
+  private def assertPlanContains(sqlText: String, fragment: String): Unit = {
+    val plan = explain(sqlText)
+    assert(plan.contains(fragment), plan)
+  }
+
+  private def explain(sqlText: String): String = {
+    sql(s"EXPLAIN EXTENDED 
$sqlText").collect().map(_.getString(0)).mkString("\n")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java
similarity index 56%
copy from 
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
copy to 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java
index 9f96840a77..3f4cc090cd 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java
@@ -16,6 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark.schema;
 
-class RowTrackingTest extends RowTrackingTestBase {}
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
+
+abstract class PaimonMetadataColumnBase implements MetadataColumn {
+
+    abstract boolean preserveOnDelete();
+
+    abstract boolean preserveOnUpdate();
+
+    abstract boolean preserveOnReinsert();
+
+    public String metadataInJSON() {
+        return "{\"__preserve_on_delete\":"
+                + preserveOnDelete()
+                + ",\"__preserve_on_update\":"
+                + preserveOnUpdate()
+                + ",\"__preserve_on_reinsert\":"
+                + preserveOnReinsert()
+                + "}";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java
new file mode 100644
index 0000000000..fd6786a885
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import scala.Option;
+
+/**
+ * Spark 4.x calls DataWriter.write(metadata, data) for metadata-aware writes. 
Keep this method in
+ * Java so the common sources still compile against Spark 3.5, where that 
interface method does not
+ * exist; Spark 4.x compilation generates the erased bridge required by the 
runtime call.
+ */
+public class PaimonV2MetadataAwareDataWriter extends PaimonV2DataWriter {
+
+    public PaimonV2MetadataAwareDataWriter(
+            BatchWriteBuilder writeBuilder,
+            StructType writeSchema,
+            StructType rowTrackingWriteSchema,
+            StructType dataSchema,
+            StructType metadataSchema,
+            CoreOptions coreOptions,
+            CatalogContext catalogContext,
+            RowType paimonWriteType) {
+        super(
+                writeBuilder,
+                rowTrackingWriteSchema,
+                dataSchema,
+                coreOptions,
+                catalogContext,
+                Option.empty(),
+                Option.apply(paimonWriteType),
+                Option.apply(metadataSchema),
+                Option.apply(writeSchema));
+    }
+
+    public void write(InternalRow metadata, InternalRow data) {
+        writeWithMetadata(metadata, data);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 0196ea6404..9ea20de190 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -40,14 +40,15 @@ import java.util.{EnumSet => JEnumSet, Set => JSet}
  * If this base class implemented `SupportsRowLevelOperations`, Spark 4.1 
would immediately call
  * `newRowLevelOperationBuilder` on tables whose V2 write is disabled (e.g. 
dynamic bucket or
  * primary-key tables that fall back to V1 write) and fail before Paimon has a 
chance to rewrite the
- * plan to a V1 command. Likewise, deletion-vector, row-tracking, and 
data-evolution tables need to
- * stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must 
also not expose
- * `SupportsRowLevelOperations`.
+ * plan to a V1 command. Likewise, deletion-vector, data-evolution, and 
fixed-length CHAR tables
+ * need to stay on Paimon's V1 postHoc path even when `useV2Write=true`, so 
they must also not
+ * expose `SupportsRowLevelOperations`.
  *
  * Tables that DO support V2 row-level operations use the 
[[SparkTableWithRowLevelOps]] subclass
  * instead; the [[SparkTable.of]] factory picks the right variant via
- * [[SparkTable.supportsV2RowLevelOps]], which is kept in lockstep with
- * `RowLevelHelper.shouldFallbackToV1`.
+ * [[SparkTable.supportsV2RowLevelOps]]. Append-only tables, including 
row-tracking-only tables,
+ * expose `SupportsRowLevelOperations` so DELETE, UPDATE, and MERGE INTO can 
go through the V2
+ * copy-on-write path when the table has no PK, deletion vectors, data 
evolution, or CHAR columns.
  */
 case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table)
 
@@ -93,12 +94,11 @@ object SparkTable {
    * Whether the given table supports Paimon's V2 row-level operations, i.e. 
whether it is safe to
    * expose [[SupportsRowLevelOperations]] to Spark.
    *
-   * This must stay in sync with
-   * 
`org.apache.paimon.spark.catalyst.analysis.RowLevelHelper#shouldFallbackToV1` — 
the two
-   * predicates are logical complements. If they diverge, Spark 4.1's 
row-level rewrite rules (which
-   * fire in the main Resolution batch) will intercept DML on tables that 
Paimon expects to handle
-   * through its postHoc V1 fallback, leaving primary-key / deletion-vector / 
row-tracking /
-   * data-evolution tables with broken MERGE/UPDATE/DELETE dispatch.
+   * Append-only tables return `true` here so that `SparkTable.of` wraps them 
as
+   * `SparkTableWithRowLevelOps`, enabling Spark's V2 copy-on-write DELETE, 
UPDATE, and MERGE INTO
+   * paths. Row-tracking append-only tables require Spark 4.0+ because Spark 
3.5 does not have the
+   * metadata-aware `DataWriter.write(metadata, data)` path needed to preserve 
row-tracking metadata
+   * for rewritten rows.
    *
    * Per-version shims for Spark 3.2/3.3/3.4 each ship their own
    * `org.apache.paimon.spark.SparkTable` (class + companion) that shadows 
this one at packaging
@@ -113,10 +113,13 @@ object SparkTable {
     if (!sparkTable.useV2Write) return false
     sparkTable.getTable match {
       case fs: FileStoreTable =>
+        val supportsRowTrackingCopyOnWrite =
+          !sparkTable.coreOptions.rowTrackingEnabled() || 
org.apache.spark.SPARK_VERSION >= "4.0"
         fs.primaryKeys().isEmpty &&
+        supportsRowTrackingCopyOnWrite &&
         !sparkTable.coreOptions.deletionVectorsEnabled() &&
-        !sparkTable.coreOptions.rowTrackingEnabled() &&
-        !sparkTable.coreOptions.dataEvolutionEnabled()
+        !sparkTable.coreOptions.dataEvolutionEnabled() &&
+        !SparkTypeUtils.containsCharType(fs.rowType())
       case _ => false
     }
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index c16f16a429..80a27c35ac 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -99,6 +99,23 @@ public class SparkTypeUtils {
         return SparkToPaimonTypeVisitor.visit(dataType);
     }
 
+    public static boolean containsCharType(org.apache.paimon.types.DataType 
type) {
+        if (type instanceof CharType) {
+            return true;
+        } else if (type instanceof RowType) {
+            return ((RowType) type).getFields().stream()
+                    .anyMatch(field -> containsCharType(field.type()));
+        } else if (type instanceof ArrayType) {
+            return containsCharType(((ArrayType) type).getElementType());
+        } else if (type instanceof MapType) {
+            MapType mapType = (MapType) type;
+            return containsCharType(mapType.getKeyType()) || 
containsCharType(mapType.getValueType());
+        } else if (type instanceof MultisetType) {
+            return containsCharType(((MultisetType) type).getElementType());
+        }
+        return false;
+    }
+
     /**
      * Prune Paimon `RowType` by required Spark `StructType`, use this method 
instead of {@link
      * #toPaimonType(DataType)} when need to retain the field id.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 4bbdb8bbd8..da43948676 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -76,17 +76,14 @@ trait RowLevelHelper extends SQLConfHelper {
       }
   }
 
-  /**
-   * Determines if DataSourceV2 is not supported for the given table. This is 
the logical complement
-   * of [[SparkTable.supportsV2RowLevelOps]]; the two predicates must stay in 
sync so that Spark
-   * 4.1's row-level rewrite rules (which key on `SupportsRowLevelOperations`) 
and Paimon's V1
-   * postHoc fallback rules (which gate on this predicate) agree about which 
tables go down which
-   * path.
-   */
   protected def shouldFallbackToV1(table: SparkTable): Boolean = {
     !SparkTable.supportsV2RowLevelOps(table)
   }
 
+  // `SparkTable.supportsV2RowLevelOps` controls whether the table exposes 
Spark row-level
+  // capability at all. These per-operation checks are the remaining V1 
fallbacks for cases Spark's
+  // V2 rewrite cannot safely handle: metadata-only DELETE, non-rewritable 
UPDATE/MERGE, or
+  // assignments that have not been aligned yet.
   /** Determines if DataSourceV2 delete is not supported for the given table. 
*/
   protected def shouldFallbackToV1Delete(table: SparkTable, condition: 
Expression): Boolean = {
     shouldFallbackToV1(table) ||
@@ -106,13 +103,6 @@ trait RowLevelHelper extends SQLConfHelper {
   protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
     val relation = PaimonRelation.getPaimonRelation(m.targetTable)
     val table = relation.table.asInstanceOf[SparkTable]
-    // Note for Spark 4.1+: `shouldFallbackToV1(table)` returns `false` for 
pure append-only
-    // tables (no PK / RT / DE / DV), so this predicate lets the aligned 
`MergeIntoTable` node
-    // return untouched. Spark's own `RewriteMergeIntoTable` in the Resolution 
batch can't fire
-    // (`resolveOperators` short-circuits on `analyzed=true`), so the rewrite 
is performed by
-    // `Spark41MergeIntoRewrite` (paimon-spark4-common) which aligns + 
transcribes Spark's
-    // `ReplaceData` / `AppendData` branches for non-`SupportsDelta` sources. 
Non-append-only
-    // tables still fall back to V1 (`MergeIntoPaimonTable` / 
`MergeIntoPaimonDataEvolutionTable`).
     shouldFallbackToV1(table) ||
     !m.rewritable ||
     !m.aligned
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
index e1e2ddd4d9..24c3c19761 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.rowops
 
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.PaimonBaseScanBuilder
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH_COLUMN, 
ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN}
 import org.apache.paimon.spark.write.PaimonV2WriteBuilder
 import org.apache.paimon.table.FileStoreTable
 
@@ -57,6 +57,11 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, 
info: RowLevelOpera
   }
 
   override def requiredMetadataAttributes(): Array[NamedReference] = {
-    Array(Expressions.column(FILE_PATH_COLUMN))
+    val base = Array(Expressions.column(FILE_PATH_COLUMN))
+    if (table.coreOptions().rowTrackingEnabled()) {
+      base ++ Array(Expressions.column(ROW_ID_COLUMN), 
Expressions.column(SEQUENCE_NUMBER_COLUMN))
+    } else {
+      base
+    }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 34343f7380..8438d5b175 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -24,11 +24,16 @@ import org.apache.paimon.types.DataField
 
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.connector.catalog.MetadataColumn
 import org.apache.spark.sql.types.{DataType, FloatType, IntegerType, LongType, 
StringType, StructField, StructType}
 
-case class PaimonMetadataColumn(id: Int, override val name: String, override 
val dataType: DataType)
-  extends MetadataColumn {
+case class PaimonMetadataColumn(
+    id: Int,
+    override val name: String,
+    override val dataType: DataType,
+    preserveOnDelete: Boolean = true,
+    preserveOnUpdate: Boolean = true,
+    preserveOnReinsert: Boolean = false)
+  extends PaimonMetadataColumnBase {
 
   def toPaimonDataField: DataField = {
     new DataField(id, name, SparkTypeUtils.toPaimonType(dataType));
@@ -79,7 +84,11 @@ object PaimonMetadataColumn {
   val ROW_ID: PaimonMetadataColumn =
     PaimonMetadataColumn(Int.MaxValue - 104, ROW_ID_COLUMN, LongType)
   val SEQUENCE_NUMBER: PaimonMetadataColumn =
-    PaimonMetadataColumn(Int.MaxValue - 105, SEQUENCE_NUMBER_COLUMN, LongType)
+    PaimonMetadataColumn(
+      Int.MaxValue - 105,
+      SEQUENCE_NUMBER_COLUMN,
+      LongType,
+      preserveOnUpdate = false)
   val VECTOR_SEARCH_SCORE: PaimonMetadataColumn =
     PaimonMetadataColumn(Integer.MAX_VALUE - 106, VECTOR_SEARCH_SCORE_COLUMN, 
FloatType)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
index d19f1a7096..42d2ebcd85 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
@@ -19,11 +19,13 @@
 package org.apache.paimon.spark.write
 
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.spark.SparkTypeUtils
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.commands.SparkDataFileMeta
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, ROW_ID, 
SEQUENCE_NUMBER}
+import org.apache.paimon.table.{FileStoreTable, SpecialFields}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl}
 
 import org.apache.spark.sql.PaimonSparkSession
@@ -67,18 +69,46 @@ abstract class PaimonBatchWriteBase(
     builder
   }
 
+  private val writeRowTracking: Boolean =
+    coreOptions.rowTrackingEnabled() && copyOnWriteScan.isDefined
+
+  private lazy val rtPaimonWriteType =
+    SpecialFields.rowTypeWithRowTracking(table.rowType(), false, true)
+
+  private lazy val rtWriteSchema =
+    SparkTypeUtils.fromPaimonRowType(rtPaimonWriteType)
+
+  private lazy val rtMetadataSchema =
+    StructType(Seq(FILE_PATH, ROW_ID, SEQUENCE_NUMBER).map(_.toStructField))
+
   protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
     (_: Int, _: Long) =>
       {
-        PaimonV2DataWriter(
-          batchWriteBuilder,
-          writeSchema,
-          dataSchema,
-          coreOptions,
-          catalogContextForBlobDescriptor)
+        if (writeRowTracking) {
+          createPaimonMetadataAwareDataWriter()
+        } else {
+          PaimonV2DataWriter(
+            batchWriteBuilder,
+            writeSchema,
+            dataSchema,
+            coreOptions,
+            catalogContextForBlobDescriptor)
+        }
       }
   }
 
+  private def createPaimonMetadataAwareDataWriter(): PaimonV2DataWriter = {
+    new PaimonV2MetadataAwareDataWriter(
+      batchWriteBuilder,
+      writeSchema,
+      rtWriteSchema,
+      dataSchema,
+      rtMetadataSchema,
+      coreOptions,
+      catalogContextForBlobDescriptor,
+      rtPaimonWriteType)
+  }
+
   protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = {
     commitStarted = true
     logInfo(s"Committing to table ${table.name()}")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
index aa2dfcdf8f..eadd056cf2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -23,8 +23,11 @@ import org.apache.paimon.catalog.CatalogContext
 import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
TableWriteImpl}
+import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.IOUtils
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.types.StructType
 
@@ -36,7 +39,10 @@ case class PaimonV2DataWriter(
     dataSchema: StructType,
     coreOptions: CoreOptions,
     catalogContext: CatalogContext,
-    batchId: Option[Long] = None)
+    batchId: Option[Long] = None,
+    paimonWriteType: Option[RowType] = None,
+    metadataSchema: Option[StructType] = None,
+    plainWriteSchema: Option[StructType] = None)
   extends abstractInnerTableDataWrite[InternalRow]
   with InnerTableV2DataWrite {
 
@@ -46,35 +52,79 @@ case class PaimonV2DataWriter(
   val fullCompactionDeltaCommits: Option[Int] =
     Option.apply(coreOptions.fullCompactionDeltaCommits())
 
-  val write: TableWriteImpl[InternalRow] = {
-    writeBuilder
+  private def createTableWrite(writeType: Option[RowType]): 
TableWriteImpl[InternalRow] = {
+    val w = writeBuilder
       .newWrite()
       .withIOManager(ioManager)
       .withMetricRegistry(metricRegistry)
       .asInstanceOf[TableWriteImpl[InternalRow]]
+    writeType.foreach(w.withWriteType)
+    w
   }
 
-  private val rowConverter: InternalRow => SparkInternalRowWrapper = {
+  val write: TableWriteImpl[InternalRow] = createTableWrite(paimonWriteType)
+
+  private var plainWrite: Option[TableWriteImpl[InternalRow]] = None
+
+  private def getPlainWrite: TableWriteImpl[InternalRow] = {
+    plainWrite.getOrElse {
+      val w = createTableWrite(None)
+      plainWrite = Some(w)
+      w
+    }
+  }
+
+  private def createRowConverter(
+      writeSchema: StructType,
+      schema: StructType): InternalRow => SparkInternalRowWrapper = {
     val numFields = writeSchema.fields.length
     val reusableWrapper =
-      new SparkInternalRowWrapper(writeSchema, numFields, dataSchema, 
catalogContext)
+      new SparkInternalRowWrapper(writeSchema, numFields, schema, 
catalogContext)
     record => reusableWrapper.replace(record)
   }
 
+  private val rowConverter: InternalRow => SparkInternalRowWrapper =
+    createRowConverter(writeSchema, dataSchema)
+
+  private val plainRowConverter: Option[InternalRow => 
SparkInternalRowWrapper] =
+    plainWriteSchema.map(schema => createRowConverter(schema, dataSchema))
+
+  private val metadataAwareRowConverter: Option[InternalRow => 
SparkInternalRowWrapper] =
+    metadataSchema.map(
+      schema => createRowConverter(writeSchema, StructType(dataSchema.fields 
++ schema.fields)))
+
+  private val joinedRow = new JoinedRow()
+
   override def write(record: InternalRow): Unit = {
-    postWrite(write.writeAndReturn(rowConverter.apply(record)))
+    plainRowConverter match {
+      case Some(converter) =>
+        postWrite(getPlainWrite.writeAndReturn(converter.apply(record)))
+      case _ =>
+        postWrite(write.writeAndReturn(rowConverter.apply(record)))
+    }
+  }
+
+  def writeWithMetadata(metadata: InternalRow, record: InternalRow): Unit = {
+    metadataAwareRowConverter match {
+      case Some(converter) =>
+        postWrite(write.writeAndReturn(converter.apply(joinedRow(record, 
metadata))))
+      case None =>
+        write(record)
+    }
   }
 
   override def commitImpl(): Seq[CommitMessage] = {
-    write.prepareCommit().asScala.toSeq
+    val metadataMessages = write.prepareCommit().asScala.toSeq
+    val plainMessages = 
plainWrite.map(_.prepareCommit().asScala.toSeq).getOrElse(Seq.empty)
+    metadataMessages ++ plainMessages
   }
 
   override def abort(): Unit = close()
 
   override def close(): Unit = {
     try {
-      write.close()
-      ioManager.close()
+      val closeables = Seq[AutoCloseable](write) ++ plainWrite.toSeq ++ 
Seq(ioManager)
+      IOUtils.closeAll(closeables.asJava)
     } catch {
       case e: Exception => throw new RuntimeException(e)
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 6728f8cb54..1da318e282 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -334,6 +334,21 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase with AdaptiveSpar
     }
   }
 
+  test("Row Tracking: delete preserves row tracking metadata for update") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+
+      sql("DELETE FROM t WHERE id = 2")
+      sql("UPDATE t SET data = 33 WHERE _ROW_ID = 2")
+
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+        Seq(Row(1, 1, 0, 1), Row(3, 33, 2, 3))
+      )
+    }
+  }
+
   test("Row Tracking: update table") {
     withTable("t") {
       // only enable row tracking
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
index c005bd40c7..4fdf2bafc0 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
@@ -18,7 +18,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.{SparkTable, SparkTypeUtils}
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -26,14 +26,13 @@ import 
org.apache.spark.sql.execution.datasources.v2.ExtractV2Table
 
 /**
  * Shared scope predicates for the Spark 4.1 Resolution-batch row-level 
rewrite rules
- * ([[Spark41AppendOnlyRowLevelRewrite]] for UPDATE + metadata-only DELETE 
reverse-optimization,
+ * ([[Spark41UpdateTableRewrite]] for UPDATE + metadata-only DELETE 
reverse-optimization,
  * [[Spark41MergeIntoRewrite]] for MERGE).
  *
- * Both rules only intercept operations against **pure append-only** Paimon 
tables: no primary key,
- * row tracking, data evolution, deletion vectors, or fixed-length `CHAR(n)` 
columns. Tables that
- * violate any of these constraints either have a working V2 rewrite path on 
4.1 (PK / DV / RT / DE
- * go through Paimon's own postHoc V1 commands) or race with Spark's 
`CharVarcharCodegenUtils`
- * padding Project (CHAR columns — see [[hasCharColumn]]).
+ * These rules only intercept operations against Paimon tables that are valid 
for Spark's V2
+ * copy-on-write rewrite: no primary key, data evolution, deletion vectors, or 
fixed-length
+ * `CHAR(n)` columns. Row-tracking-only tables are included; tables that 
violate any of these
+ * constraints go through Paimon's postHoc V1 commands or Spark's built-in 
analysis path.
  *
  * Kept as a mix-in trait so the two rewrite objects stay 
single-responsibility (one rule per Spark
  * row-level command, mirroring Spark's own `RewriteUpdateTable` / 
`RewriteMergeIntoTable` layout)
@@ -41,36 +40,25 @@ import 
org.apache.spark.sql.execution.datasources.v2.ExtractV2Table
  */
 trait PureAppendOnlyScope {
 
-  /**
-   * Whether the target of a row-level operation is a pure append-only Paimon 
table that Spark 4.1's
-   * built-in rewrite rules can't handle (see the two rule class docs for why).
-   */
-  protected def targetsPureAppendOnly(aliasedTable: LogicalPlan): Boolean = {
+  protected def targetsV2CopyOnWriteTable(aliasedTable: LogicalPlan): Boolean 
= {
+    targetsPaimonFileStoreTable(aliasedTable) {
+      case (sparkTable, fs) =>
+        fs.primaryKeys().isEmpty &&
+        !sparkTable.coreOptions.dataEvolutionEnabled() &&
+        !sparkTable.coreOptions.deletionVectorsEnabled() &&
+        !SparkTypeUtils.containsCharType(fs.rowType())
+    }
+  }
+
+  private def targetsPaimonFileStoreTable(aliasedTable: LogicalPlan)(
+      predicate: (SparkTable, FileStoreTable) => Boolean): Boolean = {
     EliminateSubqueryAliases(aliasedTable) match {
       case ExtractV2Table(sparkTable: SparkTable) =>
         sparkTable.getTable match {
-          case fs: FileStoreTable =>
-            fs.primaryKeys().isEmpty &&
-            !sparkTable.coreOptions.rowTrackingEnabled() &&
-            !sparkTable.coreOptions.dataEvolutionEnabled() &&
-            !sparkTable.coreOptions.deletionVectorsEnabled() &&
-            !hasCharColumn(fs)
+          case fs: FileStoreTable => predicate(sparkTable, fs)
           case _ => false
         }
       case _ => false
     }
   }
-
-  /**
-   * Tables with fixed-length `CHAR(n)` columns go through Spark's
-   * `CharVarcharCodegenUtils.readSidePadding` Project that gets inserted 
between the
-   * `DataSourceV2Relation` and its consumers. If we intercept before that 
padding project settles,
-   * CheckAnalysis trips on mismatched attribute ids (see PR 7648 history). 
Let those plans fall
-   * through to Paimon's postHoc V1 fallback rules which run after the padding 
project stabilizes.
-   */
-  protected def hasCharColumn(fs: FileStoreTable): Boolean = {
-    import org.apache.paimon.types.CharType
-    import scala.collection.JavaConverters._
-    fs.rowType().getFields.asScala.exists(_.`type`().isInstanceOf[CharType])
-  }
 }
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
index 0efbe6a8bb..d21bc8098e 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
@@ -42,8 +42,8 @@ import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
  * fast path that a `DeleteFromPaimonTableCommand` would enable.
  *
  * This rule pattern-matches the `ReplaceData` Spark produced (tagged with
- * `RowLevelOperation.Command.DELETE`) and, if the target is a pure 
append-only Paimon table (see
- * [[PureAppendOnlyScope]]) and the predicate is metadata-only, rewrites back 
to
+ * `RowLevelOperation.Command.DELETE`) and, if the target is a Paimon table 
eligible for V2
+ * copy-on-write (see [[PureAppendOnlyScope]]) and the predicate is 
metadata-only, rewrites back to
  * `DeleteFromPaimonTableCommand`. Non-metadata-only DELETE is left alone 
(Spark's `ReplaceData` is
  * correct for data deletes). This is **not** a rewrite of `DeleteFromTable` — 
it's a restoration
  * layered on top of Spark's existing rewrite output, hence the `…Restore` 
naming rather than
@@ -64,8 +64,8 @@ object Spark41DeleteMetadataRestore extends 
RewriteRowLevelCommand with PureAppe
   }
 
   /**
-   * Whether a `ReplaceData` node (Spark 4.1's post-rewrite DELETE form) 
targets a pure append-only
-   * Paimon table with a metadata-only predicate, such that converting back to
+   * Whether a `ReplaceData` node (Spark 4.1's post-rewrite DELETE form) 
targets a Paimon table
+   * eligible for V2 copy-on-write with a metadata-only predicate, such that 
converting back to
    * `DeleteFromPaimonTableCommand` would let the optimizer fold to 
`TruncatePaimonTableWithFilter`.
    */
   private def isMetadataOnlyDeleteOnAppendOnlyPaimon(rd: ReplaceData): Boolean 
= {
@@ -78,7 +78,7 @@ object Spark41DeleteMetadataRestore extends 
RewriteRowLevelCommand with PureAppe
       case _ => false
     }
     writeIsDelete && (rd.originalTable match {
-      case r: DataSourceV2Relation if targetsPureAppendOnly(r) =>
+      case r: DataSourceV2Relation if targetsV2CopyOnWriteTable(r) =>
         r.table match {
           case spk: SparkTable =>
             spk.getTable match {
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
index 0b2e6c607e..f3dd21f15f 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
@@ -38,9 +38,9 @@ import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
- * Spark 4.1-only Resolution-batch rule that rewrites MERGE INTO on pure 
append-only Paimon tables
- * (no PK / RT / DE / DV) into V2 `ReplaceData` / `AppendData` plans, 
mirroring Spark's built-in
- * `RewriteMergeIntoTable` for non-`SupportsDelta` row-level tables.
+ * Spark 4.1-only Resolution-batch rule that rewrites MERGE INTO on Paimon 
tables eligible for V2
+ * copy-on-write (no PK / DE / DV / CHAR) into V2 `ReplaceData` / `AppendData` 
plans, mirroring
+ * Spark's built-in `RewriteMergeIntoTable` for non-`SupportsDelta` row-level 
tables.
  *
  * In Spark 4.1, `RewriteMergeIntoTable` runs in the Resolution batch via 
`resolveOperators`, which
  * short-circuits on `analyzed=true` plans — by the time it would fire, the 
`MergeIntoTable` is
@@ -52,9 +52,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  * We fire before `ResolveAssignments`, so `m.aligned` is `false`. The rule 
pre-aligns each action
  * list via `PaimonAssignmentUtils.alignActions` (shared with the postHoc 
`PaimonMergeInto` rule).
  *
- * CHAR columns are excluded — `readSidePadding` races with the rewrite and 
trips CheckAnalysis;
- * those plans fall back to the postHoc `PaimonMergeInto` V1 path, which also 
owns PK / RT / DE / DV
- * tables via `RowLevelHelper.shouldFallbackToV1MergeInto`.
+ * Row-tracking-only tables use the same V2 copy-on-write rewrite. CHAR 
columns are excluded —
+ * `readSidePadding` races with the rewrite and trips CheckAnalysis; those 
plans fall back to the
+ * postHoc `PaimonMergeInto` V1 path, which also owns PK / DE / DV tables via
+ * `RowLevelHelper.shouldFallbackToV1MergeInto`.
  */
 object Spark41MergeIntoRewrite
   extends RewriteRowLevelCommand
@@ -71,7 +72,7 @@ object Spark41MergeIntoRewrite
       plan.transformDown {
         case m: MergeIntoTable
             if m.resolved && m.rewritable && !m.needSchemaEvolution &&
-              targetsPureAppendOnly(m.targetTable) =>
+              targetsV2CopyOnWriteTable(m.targetTable) =>
           // Pure append-only tables skip postHoc `PaimonMergeInto`, so evolve 
schema here.
           val evolved = evolveSchemaIfPaimon(m)
           rewrite(alignAllMergeActions(evolved, evolved.targetTable.output))
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
index d8082b592e..97edbdc780 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
@@ -47,8 +47,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
  * We fire before `ResolveAssignments`, so `u.aligned` is `false`; the rule 
pre-aligns via
  * `PaimonAssignmentUtils.alignUpdateAssignments` before building the plan.
  *
- * PK tables go through the postHoc rule; RT / DE / DV tables go through 
Spark's V2 path. DELETE is
- * handled by [[Spark41DeleteMetadataRestore]]; MERGE by 
[[Spark41MergeIntoRewrite]].
+ * Row-tracking-only tables use the same V2 copy-on-write rewrite. PK / DE / 
DV tables go through
+ * the postHoc V1 rule because they do not expose 
`SupportsRowLevelOperations`. DELETE is handled by
+ * [[Spark41DeleteMetadataRestore]]; MERGE by [[Spark41MergeIntoRewrite]].
  */
 object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with 
PureAppendOnlyScope {
 
@@ -57,7 +58,7 @@ object Spark41UpdateTableRewrite extends 
RewriteRowLevelCommand with PureAppendO
     AnalysisHelper.allowInvokingTransformsInAnalyzer {
       plan.transformDown {
         case u @ UpdateTable(aliasedTable, assignments, cond)
-            if u.resolved && u.rewritable && 
targetsPureAppendOnly(aliasedTable) =>
+            if u.resolved && u.rewritable && 
targetsV2CopyOnWriteTable(aliasedTable) =>
           EliminateSubqueryAliases(aliasedTable) match {
             case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
               val table = buildOperationTable(tbl, UPDATE, 
CaseInsensitiveStringMap.empty())


Reply via email to