This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0824fe77b1 [spark] Support V2 DML for row-tracking append-only tables
(#8094)
0824fe77b1 is described below
commit 0824fe77b16cc50a2988f1322b4a8a33a3691874
Author: Kerwin Zhang <[email protected]>
AuthorDate: Thu Jun 4 13:44:30 2026 +0800
[spark] Support V2 DML for row-tracking append-only tables (#8094)
---
.../apache/paimon/spark/sql/RowTrackingTest.scala | 19 +++-
.../apache/paimon/spark/sql/RowTrackingTest.scala | 109 ++++++++++++++++++++-
.../spark/schema/PaimonMetadataColumnBase.java} | 23 ++++-
.../write/PaimonV2MetadataAwareDataWriter.java | 62 ++++++++++++
.../scala/org/apache/paimon/spark/SparkTable.scala | 29 +++---
.../org/apache/paimon/spark/SparkTypeUtils.java | 17 ++++
.../spark/catalyst/analysis/RowLevelHelper.scala | 18 +---
.../rowops/PaimonSparkCopyOnWriteOperation.scala | 9 +-
.../paimon/spark/schema/PaimonMetadataColumn.scala | 17 +++-
.../paimon/spark/write/PaimonBatchWriteBase.scala | 44 +++++++--
.../paimon/spark/write/PaimonV2DataWriter.scala | 68 +++++++++++--
.../paimon/spark/sql/RowTrackingTestBase.scala | 15 +++
.../catalyst/analysis/PureAppendOnlyScope.scala | 50 ++++------
.../analysis/Spark41DeleteMetadataRestore.scala | 10 +-
.../analysis/Spark41MergeIntoRewrite.scala | 15 +--
.../analysis/Spark41UpdateTableRewrite.scala | 7 +-
16 files changed, 413 insertions(+), 99 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index 9f96840a77..60bfd244b2 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,21 @@
package org.apache.paimon.spark.sql
-class RowTrackingTest extends RowTrackingTestBase {}
+import org.apache.paimon.spark.SparkTable
+
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+
+class RowTrackingTest extends RowTrackingTestBase {
+
+ test("Row Tracking: Spark 3.5 keeps row-tracking tables on V1 DML path") {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ withTable("t", "rt") {
+ sql("CREATE TABLE t (id INT, data INT)")
+
assert(SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
+
+ sql("CREATE TABLE rt (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+
assert(!SparkTable.of(loadTable("rt")).isInstanceOf[SupportsRowLevelOperations])
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
index 9f96840a77..382aa1e778 100644
---
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
+++
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
@@ -18,4 +18,111 @@
package org.apache.paimon.spark.sql
-class RowTrackingTest extends RowTrackingTestBase {}
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.schema.PaimonMetadataColumn
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.types.Metadata
+
+class RowTrackingTest extends RowTrackingTestBase {
+
+ test("Row Tracking: metadata columns expose Spark preserve flags") {
+ val rowIdMetadata =
Metadata.fromJson(PaimonMetadataColumn.ROW_ID.metadataInJSON())
+ assert(rowIdMetadata.getBoolean("__preserve_on_delete"))
+ assert(rowIdMetadata.getBoolean("__preserve_on_update"))
+ assert(!rowIdMetadata.getBoolean("__preserve_on_reinsert"))
+
+ val sequenceNumberMetadata =
+ Metadata.fromJson(PaimonMetadataColumn.SEQUENCE_NUMBER.metadataInJSON())
+ assert(sequenceNumberMetadata.getBoolean("__preserve_on_delete"))
+ assert(!sequenceNumberMetadata.getBoolean("__preserve_on_update"))
+ assert(!sequenceNumberMetadata.getBoolean("__preserve_on_reinsert"))
+ }
+
+ test("Row Tracking: Spark 4.1 uses V2 copy-on-write for DML") {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ withTable("s", "t") {
+ sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO t VALUES (1, 1), (2, 2)")
+ sql("INSERT INTO t VALUES (3, 3), (4, 4)")
+
+ assertPlanContains("DELETE FROM t WHERE id = 2", "ReplaceData")
+ sql("DELETE FROM t WHERE id = 2")
+
+ assertPlanContains("UPDATE t SET data = 30 WHERE id = 3",
"ReplaceData")
+ sql("UPDATE t SET data = 30 WHERE id = 3")
+
+ sql("CREATE TABLE s (id INT, data INT)")
+ sql("INSERT INTO s VALUES (3, 300), (5, 500)")
+ assertPlanContains(
+ """
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET data = s.data
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin,
+ "ReplaceData"
+ )
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN UPDATE SET data = s.data
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(3, 300, 2, 5), Row(4, 4, 3, 2), Row(5, 500,
4, 5))
+ )
+ }
+ }
+ }
+
+ test("Row Tracking: nested CHAR columns do not expose V2 row-level
capability") {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | id INT,
+ | info STRUCT<name: CHAR(3), data: INT>
+ |) TBLPROPERTIES ('row-tracking.enabled' = 'true')
+ |""".stripMargin)
+
+
assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])
+ }
+ }
+ }
+
+ test("Row Tracking: Spark 4.1 restores metadata-only delete fast path") {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (id INT, data INT, dt STRING)
+ |PARTITIONED BY (dt)
+ |TBLPROPERTIES ('row-tracking.enabled' = 'true')
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p1'), (3, 3, 'p2')")
+
+ assertPlanContains("DELETE FROM t WHERE dt = 'p1'",
"DeleteFromPaimonTableCommand")
+ sql("DELETE FROM t WHERE dt = 'p1'")
+
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(3, 3, "p2", 0, 1))
+ )
+ }
+ }
+ }
+
+ private def assertPlanContains(sqlText: String, fragment: String): Unit = {
+ val plan = explain(sqlText)
+ assert(plan.contains(fragment), plan)
+ }
+
+ private def explain(sqlText: String): String = {
+ sql(s"EXPLAIN EXTENDED
$sqlText").collect().map(_.getString(0)).mkString("\n")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java
similarity index 56%
copy from
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
copy to
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java
index 9f96840a77..3f4cc090cd 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTest.scala
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/schema/PaimonMetadataColumnBase.java
@@ -16,6 +16,25 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.sql
+package org.apache.paimon.spark.schema;
-class RowTrackingTest extends RowTrackingTestBase {}
+import org.apache.spark.sql.connector.catalog.MetadataColumn;
+
+abstract class PaimonMetadataColumnBase implements MetadataColumn {
+
+ abstract boolean preserveOnDelete();
+
+ abstract boolean preserveOnUpdate();
+
+ abstract boolean preserveOnReinsert();
+
+ public String metadataInJSON() {
+ return "{\"__preserve_on_delete\":"
+ + preserveOnDelete()
+ + ",\"__preserve_on_update\":"
+ + preserveOnUpdate()
+ + ",\"__preserve_on_reinsert\":"
+ + preserveOnReinsert()
+ + "}";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java
new file mode 100644
index 0000000000..fd6786a885
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/write/PaimonV2MetadataAwareDataWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import scala.Option;
+
+/**
+ * Spark 4.x calls DataWriter.write(metadata, data) for metadata-aware writes.
Keep this method in
+ * Java so the common sources still compile against Spark 3.5, where that
interface method does not
+ * exist; Spark 4.x compilation generates the erased bridge required by the
runtime call.
+ */
+public class PaimonV2MetadataAwareDataWriter extends PaimonV2DataWriter {
+
+ public PaimonV2MetadataAwareDataWriter(
+ BatchWriteBuilder writeBuilder,
+ StructType writeSchema,
+ StructType rowTrackingWriteSchema,
+ StructType dataSchema,
+ StructType metadataSchema,
+ CoreOptions coreOptions,
+ CatalogContext catalogContext,
+ RowType paimonWriteType) {
+ super(
+ writeBuilder,
+ rowTrackingWriteSchema,
+ dataSchema,
+ coreOptions,
+ catalogContext,
+ Option.empty(),
+ Option.apply(paimonWriteType),
+ Option.apply(metadataSchema),
+ Option.apply(writeSchema));
+ }
+
+ public void write(InternalRow metadata, InternalRow data) {
+ writeWithMetadata(metadata, data);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 0196ea6404..9ea20de190 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -40,14 +40,15 @@ import java.util.{EnumSet => JEnumSet, Set => JSet}
* If this base class implemented `SupportsRowLevelOperations`, Spark 4.1
would immediately call
* `newRowLevelOperationBuilder` on tables whose V2 write is disabled (e.g.
dynamic bucket or
* primary-key tables that fall back to V1 write) and fail before Paimon has a
chance to rewrite the
- * plan to a V1 command. Likewise, deletion-vector, row-tracking, and
data-evolution tables need to
- * stay on Paimon's V1 postHoc path even when `useV2Write=true`, so they must
also not expose
- * `SupportsRowLevelOperations`.
+ * plan to a V1 command. Likewise, deletion-vector, data-evolution, and
fixed-length CHAR tables
+ * need to stay on Paimon's V1 postHoc path even when `useV2Write=true`, so
they must also not
+ * expose `SupportsRowLevelOperations`.
*
* Tables that DO support V2 row-level operations use the
[[SparkTableWithRowLevelOps]] subclass
* instead; the [[SparkTable.of]] factory picks the right variant via
- * [[SparkTable.supportsV2RowLevelOps]], which is kept in lockstep with
- * `RowLevelHelper.shouldFallbackToV1`.
+ * [[SparkTable.supportsV2RowLevelOps]]. Append-only tables, including
row-tracking-only tables,
+ * expose `SupportsRowLevelOperations` so DELETE, UPDATE, and MERGE INTO can
go through the V2
+ * copy-on-write path when the table has no PK, deletion vectors, data
evolution, or CHAR columns.
*/
case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table)
@@ -93,12 +94,11 @@ object SparkTable {
* Whether the given table supports Paimon's V2 row-level operations, i.e.
whether it is safe to
* expose [[SupportsRowLevelOperations]] to Spark.
*
- * This must stay in sync with
- *
`org.apache.paimon.spark.catalyst.analysis.RowLevelHelper#shouldFallbackToV1` —
the two
- * predicates are logical complements. If they diverge, Spark 4.1's
row-level rewrite rules (which
- * fire in the main Resolution batch) will intercept DML on tables that
Paimon expects to handle
- * through its postHoc V1 fallback, leaving primary-key / deletion-vector /
row-tracking /
- * data-evolution tables with broken MERGE/UPDATE/DELETE dispatch.
+ * Append-only tables return `true` here so that `SparkTable.of` wraps them
as
+ * `SparkTableWithRowLevelOps`, enabling Spark's V2 copy-on-write DELETE,
UPDATE, and MERGE INTO
+ * paths. Row-tracking append-only tables require Spark 4.0+ because Spark
3.5 does not have the
+ * metadata-aware `DataWriter.write(metadata, data)` path needed to preserve
row-tracking metadata
+ * for rewritten rows.
*
* Per-version shims for Spark 3.2/3.3/3.4 each ship their own
* `org.apache.paimon.spark.SparkTable` (class + companion) that shadows
this one at packaging
@@ -113,10 +113,13 @@ object SparkTable {
if (!sparkTable.useV2Write) return false
sparkTable.getTable match {
case fs: FileStoreTable =>
+ val supportsRowTrackingCopyOnWrite =
+ !sparkTable.coreOptions.rowTrackingEnabled() ||
org.apache.spark.SPARK_VERSION >= "4.0"
fs.primaryKeys().isEmpty &&
+ supportsRowTrackingCopyOnWrite &&
!sparkTable.coreOptions.deletionVectorsEnabled() &&
- !sparkTable.coreOptions.rowTrackingEnabled() &&
- !sparkTable.coreOptions.dataEvolutionEnabled()
+ !sparkTable.coreOptions.dataEvolutionEnabled() &&
+ !SparkTypeUtils.containsCharType(fs.rowType())
case _ => false
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index c16f16a429..80a27c35ac 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -99,6 +99,23 @@ public class SparkTypeUtils {
return SparkToPaimonTypeVisitor.visit(dataType);
}
+ public static boolean containsCharType(org.apache.paimon.types.DataType
type) {
+ if (type instanceof CharType) {
+ return true;
+ } else if (type instanceof RowType) {
+ return ((RowType) type).getFields().stream()
+ .anyMatch(field -> containsCharType(field.type()));
+ } else if (type instanceof ArrayType) {
+ return containsCharType(((ArrayType) type).getElementType());
+ } else if (type instanceof MapType) {
+ MapType mapType = (MapType) type;
+ return containsCharType(mapType.getKeyType()) ||
containsCharType(mapType.getValueType());
+ } else if (type instanceof MultisetType) {
+ return containsCharType(((MultisetType) type).getElementType());
+ }
+ return false;
+ }
+
/**
* Prune Paimon `RowType` by required Spark `StructType`, use this method
instead of {@link
* #toPaimonType(DataType)} when need to retain the field id.
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 4bbdb8bbd8..da43948676 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -76,17 +76,14 @@ trait RowLevelHelper extends SQLConfHelper {
}
}
- /**
- * Determines if DataSourceV2 is not supported for the given table. This is
the logical complement
- * of [[SparkTable.supportsV2RowLevelOps]]; the two predicates must stay in
sync so that Spark
- * 4.1's row-level rewrite rules (which key on `SupportsRowLevelOperations`)
and Paimon's V1
- * postHoc fallback rules (which gate on this predicate) agree about which
tables go down which
- * path.
- */
protected def shouldFallbackToV1(table: SparkTable): Boolean = {
!SparkTable.supportsV2RowLevelOps(table)
}
+ // `SparkTable.supportsV2RowLevelOps` controls whether the table exposes
Spark row-level
+ // capability at all. These per-operation checks are the remaining V1
fallbacks for cases Spark's
+ // V2 rewrite cannot safely handle: metadata-only DELETE, non-rewritable
UPDATE/MERGE, or
+ // assignments that have not been aligned yet.
/** Determines if DataSourceV2 delete is not supported for the given table.
*/
protected def shouldFallbackToV1Delete(table: SparkTable, condition:
Expression): Boolean = {
shouldFallbackToV1(table) ||
@@ -106,13 +103,6 @@ trait RowLevelHelper extends SQLConfHelper {
protected def shouldFallbackToV1MergeInto(m: MergeIntoTable): Boolean = {
val relation = PaimonRelation.getPaimonRelation(m.targetTable)
val table = relation.table.asInstanceOf[SparkTable]
- // Note for Spark 4.1+: `shouldFallbackToV1(table)` returns `false` for
pure append-only
- // tables (no PK / RT / DE / DV), so this predicate lets the aligned
`MergeIntoTable` node
- // return untouched. Spark's own `RewriteMergeIntoTable` in the Resolution
batch can't fire
- // (`resolveOperators` short-circuits on `analyzed=true`), so the rewrite
is performed by
- // `Spark41MergeIntoRewrite` (paimon-spark4-common) which aligns +
transcribes Spark's
- // `ReplaceData` / `AppendData` branches for non-`SupportsDelta` sources.
Non-append-only
- // tables still fall back to V1 (`MergeIntoPaimonTable` /
`MergeIntoPaimonDataEvolutionTable`).
shouldFallbackToV1(table) ||
!m.rewritable ||
!m.aligned
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
index e1e2ddd4d9..24c3c19761 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.rowops
import org.apache.paimon.options.Options
import org.apache.paimon.spark.PaimonBaseScanBuilder
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH_COLUMN,
ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN}
import org.apache.paimon.spark.write.PaimonV2WriteBuilder
import org.apache.paimon.table.FileStoreTable
@@ -57,6 +57,11 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable,
info: RowLevelOpera
}
override def requiredMetadataAttributes(): Array[NamedReference] = {
- Array(Expressions.column(FILE_PATH_COLUMN))
+ val base = Array(Expressions.column(FILE_PATH_COLUMN))
+ if (table.coreOptions().rowTrackingEnabled()) {
+ base ++ Array(Expressions.column(ROW_ID_COLUMN),
Expressions.column(SEQUENCE_NUMBER_COLUMN))
+ } else {
+ base
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 34343f7380..8438d5b175 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -24,11 +24,16 @@ import org.apache.paimon.types.DataField
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.connector.catalog.MetadataColumn
import org.apache.spark.sql.types.{DataType, FloatType, IntegerType, LongType,
StringType, StructField, StructType}
-case class PaimonMetadataColumn(id: Int, override val name: String, override
val dataType: DataType)
- extends MetadataColumn {
+case class PaimonMetadataColumn(
+ id: Int,
+ override val name: String,
+ override val dataType: DataType,
+ preserveOnDelete: Boolean = true,
+ preserveOnUpdate: Boolean = true,
+ preserveOnReinsert: Boolean = false)
+ extends PaimonMetadataColumnBase {
def toPaimonDataField: DataField = {
new DataField(id, name, SparkTypeUtils.toPaimonType(dataType));
@@ -79,7 +84,11 @@ object PaimonMetadataColumn {
val ROW_ID: PaimonMetadataColumn =
PaimonMetadataColumn(Int.MaxValue - 104, ROW_ID_COLUMN, LongType)
val SEQUENCE_NUMBER: PaimonMetadataColumn =
- PaimonMetadataColumn(Int.MaxValue - 105, SEQUENCE_NUMBER_COLUMN, LongType)
+ PaimonMetadataColumn(
+ Int.MaxValue - 105,
+ SEQUENCE_NUMBER_COLUMN,
+ LongType,
+ preserveOnUpdate = false)
val VECTOR_SEARCH_SCORE: PaimonMetadataColumn =
PaimonMetadataColumn(Integer.MAX_VALUE - 106, VECTOR_SEARCH_SCORE_COLUMN,
FloatType)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
index d19f1a7096..42d2ebcd85 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
@@ -19,11 +19,13 @@
package org.apache.paimon.spark.write
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.commands.SparkDataFileMeta
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, ROW_ID,
SEQUENCE_NUMBER}
+import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl}
import org.apache.spark.sql.PaimonSparkSession
@@ -67,18 +69,46 @@ abstract class PaimonBatchWriteBase(
builder
}
+ private val writeRowTracking: Boolean =
+ coreOptions.rowTrackingEnabled() && copyOnWriteScan.isDefined
+
+ private lazy val rtPaimonWriteType =
+ SpecialFields.rowTypeWithRowTracking(table.rowType(), false, true)
+
+ private lazy val rtWriteSchema =
+ SparkTypeUtils.fromPaimonRowType(rtPaimonWriteType)
+
+ private lazy val rtMetadataSchema =
+ StructType(Seq(FILE_PATH, ROW_ID, SEQUENCE_NUMBER).map(_.toStructField))
+
protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
(_: Int, _: Long) =>
{
- PaimonV2DataWriter(
- batchWriteBuilder,
- writeSchema,
- dataSchema,
- coreOptions,
- catalogContextForBlobDescriptor)
+ if (writeRowTracking) {
+ createPaimonMetadataAwareDataWriter()
+ } else {
+ PaimonV2DataWriter(
+ batchWriteBuilder,
+ writeSchema,
+ dataSchema,
+ coreOptions,
+ catalogContextForBlobDescriptor)
+ }
}
}
+ private def createPaimonMetadataAwareDataWriter(): PaimonV2DataWriter = {
+ new PaimonV2MetadataAwareDataWriter(
+ batchWriteBuilder,
+ writeSchema,
+ rtWriteSchema,
+ dataSchema,
+ rtMetadataSchema,
+ coreOptions,
+ catalogContextForBlobDescriptor,
+ rtPaimonWriteType)
+ }
+
protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = {
commitStarted = true
logInfo(s"Committing to table ${table.name()}")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
index aa2dfcdf8f..eadd056cf2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -23,8 +23,11 @@ import org.apache.paimon.catalog.CatalogContext
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
TableWriteImpl}
+import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.IOUtils
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.types.StructType
@@ -36,7 +39,10 @@ case class PaimonV2DataWriter(
dataSchema: StructType,
coreOptions: CoreOptions,
catalogContext: CatalogContext,
- batchId: Option[Long] = None)
+ batchId: Option[Long] = None,
+ paimonWriteType: Option[RowType] = None,
+ metadataSchema: Option[StructType] = None,
+ plainWriteSchema: Option[StructType] = None)
extends abstractInnerTableDataWrite[InternalRow]
with InnerTableV2DataWrite {
@@ -46,35 +52,79 @@ case class PaimonV2DataWriter(
val fullCompactionDeltaCommits: Option[Int] =
Option.apply(coreOptions.fullCompactionDeltaCommits())
- val write: TableWriteImpl[InternalRow] = {
- writeBuilder
+ private def createTableWrite(writeType: Option[RowType]):
TableWriteImpl[InternalRow] = {
+ val w = writeBuilder
.newWrite()
.withIOManager(ioManager)
.withMetricRegistry(metricRegistry)
.asInstanceOf[TableWriteImpl[InternalRow]]
+ writeType.foreach(w.withWriteType)
+ w
}
- private val rowConverter: InternalRow => SparkInternalRowWrapper = {
+ val write: TableWriteImpl[InternalRow] = createTableWrite(paimonWriteType)
+
+ private var plainWrite: Option[TableWriteImpl[InternalRow]] = None
+
+ private def getPlainWrite: TableWriteImpl[InternalRow] = {
+ plainWrite.getOrElse {
+ val w = createTableWrite(None)
+ plainWrite = Some(w)
+ w
+ }
+ }
+
+ private def createRowConverter(
+ writeSchema: StructType,
+ schema: StructType): InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
val reusableWrapper =
- new SparkInternalRowWrapper(writeSchema, numFields, dataSchema,
catalogContext)
+ new SparkInternalRowWrapper(writeSchema, numFields, schema,
catalogContext)
record => reusableWrapper.replace(record)
}
+ private val rowConverter: InternalRow => SparkInternalRowWrapper =
+ createRowConverter(writeSchema, dataSchema)
+
+ private val plainRowConverter: Option[InternalRow =>
SparkInternalRowWrapper] =
+ plainWriteSchema.map(schema => createRowConverter(schema, dataSchema))
+
+ private val metadataAwareRowConverter: Option[InternalRow =>
SparkInternalRowWrapper] =
+ metadataSchema.map(
+ schema => createRowConverter(writeSchema, StructType(dataSchema.fields
++ schema.fields)))
+
+ private val joinedRow = new JoinedRow()
+
override def write(record: InternalRow): Unit = {
- postWrite(write.writeAndReturn(rowConverter.apply(record)))
+ plainRowConverter match {
+ case Some(converter) =>
+ postWrite(getPlainWrite.writeAndReturn(converter.apply(record)))
+ case _ =>
+ postWrite(write.writeAndReturn(rowConverter.apply(record)))
+ }
+ }
+
+ def writeWithMetadata(metadata: InternalRow, record: InternalRow): Unit = {
+ metadataAwareRowConverter match {
+ case Some(converter) =>
+ postWrite(write.writeAndReturn(converter.apply(joinedRow(record,
metadata))))
+ case None =>
+ write(record)
+ }
}
override def commitImpl(): Seq[CommitMessage] = {
- write.prepareCommit().asScala.toSeq
+ val metadataMessages = write.prepareCommit().asScala.toSeq
+ val plainMessages =
plainWrite.map(_.prepareCommit().asScala.toSeq).getOrElse(Seq.empty)
+ metadataMessages ++ plainMessages
}
override def abort(): Unit = close()
override def close(): Unit = {
try {
- write.close()
- ioManager.close()
+ val closeables = Seq[AutoCloseable](write) ++ plainWrite.toSeq ++
Seq(ioManager)
+ IOUtils.closeAll(closeables.asJava)
} catch {
case e: Exception => throw new RuntimeException(e)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 6728f8cb54..1da318e282 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -334,6 +334,21 @@ abstract class RowTrackingTestBase extends
PaimonSparkTestBase with AdaptiveSpar
}
}
+ test("Row Tracking: delete preserves row tracking metadata for update") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, data INT) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM
range(1, 4)")
+
+ sql("DELETE FROM t WHERE id = 2")
+ sql("UPDATE t SET data = 33 WHERE _ROW_ID = 2")
+
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"),
+ Seq(Row(1, 1, 0, 1), Row(3, 33, 2, 3))
+ )
+ }
+ }
+
test("Row Tracking: update table") {
withTable("t") {
// only enable row tracking
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
index c005bd40c7..4fdf2bafc0 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PureAppendOnlyScope.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.{SparkTable, SparkTypeUtils}
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -26,14 +26,13 @@ import
org.apache.spark.sql.execution.datasources.v2.ExtractV2Table
/**
* Shared scope predicates for the Spark 4.1 Resolution-batch row-level
rewrite rules
- * ([[Spark41AppendOnlyRowLevelRewrite]] for UPDATE + metadata-only DELETE
reverse-optimization,
+ * ([[Spark41UpdateTableRewrite]] for UPDATE + metadata-only DELETE
reverse-optimization,
* [[Spark41MergeIntoRewrite]] for MERGE).
*
- * Both rules only intercept operations against **pure append-only** Paimon
tables: no primary key,
- * row tracking, data evolution, deletion vectors, or fixed-length `CHAR(n)`
columns. Tables that
- * violate any of these constraints either have a working V2 rewrite path on
4.1 (PK / DV / RT / DE
- * go through Paimon's own postHoc V1 commands) or race with Spark's
`CharVarcharCodegenUtils`
- * padding Project (CHAR columns — see [[hasCharColumn]]).
+ * These rules only intercept operations against Paimon tables that are valid
for Spark's V2
+ * copy-on-write rewrite: no primary key, data evolution, deletion vectors, or
fixed-length
+ * `CHAR(n)` columns. Row-tracking-only tables are included; tables that
violate any of these
+ * constraints go through Paimon's postHoc V1 commands or Spark's built-in
analysis path.
*
* Kept as a mix-in trait so the two rewrite objects stay
single-responsibility (one rule per Spark
* row-level command, mirroring Spark's own `RewriteUpdateTable` /
`RewriteMergeIntoTable` layout)
@@ -41,36 +40,25 @@ import
org.apache.spark.sql.execution.datasources.v2.ExtractV2Table
*/
trait PureAppendOnlyScope {
- /**
- * Whether the target of a row-level operation is a pure append-only Paimon
table that Spark 4.1's
- * built-in rewrite rules can't handle (see the two rule class docs for why).
- */
- protected def targetsPureAppendOnly(aliasedTable: LogicalPlan): Boolean = {
+ protected def targetsV2CopyOnWriteTable(aliasedTable: LogicalPlan): Boolean
= {
+ targetsPaimonFileStoreTable(aliasedTable) {
+ case (sparkTable, fs) =>
+ fs.primaryKeys().isEmpty &&
+ !sparkTable.coreOptions.dataEvolutionEnabled() &&
+ !sparkTable.coreOptions.deletionVectorsEnabled() &&
+ !SparkTypeUtils.containsCharType(fs.rowType())
+ }
+ }
+
+ private def targetsPaimonFileStoreTable(aliasedTable: LogicalPlan)(
+ predicate: (SparkTable, FileStoreTable) => Boolean): Boolean = {
EliminateSubqueryAliases(aliasedTable) match {
case ExtractV2Table(sparkTable: SparkTable) =>
sparkTable.getTable match {
- case fs: FileStoreTable =>
- fs.primaryKeys().isEmpty &&
- !sparkTable.coreOptions.rowTrackingEnabled() &&
- !sparkTable.coreOptions.dataEvolutionEnabled() &&
- !sparkTable.coreOptions.deletionVectorsEnabled() &&
- !hasCharColumn(fs)
+ case fs: FileStoreTable => predicate(sparkTable, fs)
case _ => false
}
case _ => false
}
}
-
- /**
- * Tables with fixed-length `CHAR(n)` columns go through Spark's
- * `CharVarcharCodegenUtils.readSidePadding` Project that gets inserted
between the
- * `DataSourceV2Relation` and its consumers. If we intercept before that
padding project settles,
- * CheckAnalysis trips on mismatched attribute ids (see PR 7648 history).
Let those plans fall
- * through to Paimon's postHoc V1 fallback rules which run after the padding
project stabilizes.
- */
- protected def hasCharColumn(fs: FileStoreTable): Boolean = {
- import org.apache.paimon.types.CharType
- import scala.collection.JavaConverters._
- fs.rowType().getFields.asScala.exists(_.`type`().isInstanceOf[CharType])
- }
}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
index 0efbe6a8bb..d21bc8098e 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41DeleteMetadataRestore.scala
@@ -42,8 +42,8 @@ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
* fast path that a `DeleteFromPaimonTableCommand` would enable.
*
* This rule pattern-matches the `ReplaceData` Spark produced (tagged with
- * `RowLevelOperation.Command.DELETE`) and, if the target is a pure
append-only Paimon table (see
- * [[PureAppendOnlyScope]]) and the predicate is metadata-only, rewrites back
to
+ * `RowLevelOperation.Command.DELETE`) and, if the target is a Paimon table
eligible for V2
+ * copy-on-write (see [[PureAppendOnlyScope]]) and the predicate is
metadata-only, rewrites back to
* `DeleteFromPaimonTableCommand`. Non-metadata-only DELETE is left alone
(Spark's `ReplaceData` is
* correct for data deletes). This is **not** a rewrite of `DeleteFromTable` —
it's a restoration
* layered on top of Spark's existing rewrite output, hence the `…Restore`
naming rather than
@@ -64,8 +64,8 @@ object Spark41DeleteMetadataRestore extends
RewriteRowLevelCommand with PureAppe
}
/**
- * Whether a `ReplaceData` node (Spark 4.1's post-rewrite DELETE form)
targets a pure append-only
- * Paimon table with a metadata-only predicate, such that converting back to
+ * Whether a `ReplaceData` node (Spark 4.1's post-rewrite DELETE form)
targets a Paimon table
+ * eligible for V2 copy-on-write with a metadata-only predicate, such that
converting back to
* `DeleteFromPaimonTableCommand` would let the optimizer fold to
`TruncatePaimonTableWithFilter`.
*/
private def isMetadataOnlyDeleteOnAppendOnlyPaimon(rd: ReplaceData): Boolean
= {
@@ -78,7 +78,7 @@ object Spark41DeleteMetadataRestore extends
RewriteRowLevelCommand with PureAppe
case _ => false
}
writeIsDelete && (rd.originalTable match {
- case r: DataSourceV2Relation if targetsPureAppendOnly(r) =>
+ case r: DataSourceV2Relation if targetsV2CopyOnWriteTable(r) =>
r.table match {
case spk: SparkTable =>
spk.getTable match {
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
index 0b2e6c607e..f3dd21f15f 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
@@ -38,9 +38,9 @@ import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
- * Spark 4.1-only Resolution-batch rule that rewrites MERGE INTO on pure
append-only Paimon tables
- * (no PK / RT / DE / DV) into V2 `ReplaceData` / `AppendData` plans,
mirroring Spark's built-in
- * `RewriteMergeIntoTable` for non-`SupportsDelta` row-level tables.
+ * Spark 4.1-only Resolution-batch rule that rewrites MERGE INTO on Paimon
tables eligible for V2
+ * copy-on-write (no PK / DE / DV / CHAR) into V2 `ReplaceData` / `AppendData`
plans, mirroring
+ * Spark's built-in `RewriteMergeIntoTable` for non-`SupportsDelta` row-level
tables.
*
* In Spark 4.1, `RewriteMergeIntoTable` runs in the Resolution batch via
`resolveOperators`, which
* short-circuits on `analyzed=true` plans — by the time it would fire, the
`MergeIntoTable` is
@@ -52,9 +52,10 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* We fire before `ResolveAssignments`, so `m.aligned` is `false`. The rule
pre-aligns each action
* list via `PaimonAssignmentUtils.alignActions` (shared with the postHoc
`PaimonMergeInto` rule).
*
- * CHAR columns are excluded — `readSidePadding` races with the rewrite and
trips CheckAnalysis;
- * those plans fall back to the postHoc `PaimonMergeInto` V1 path, which also
owns PK / RT / DE / DV
- * tables via `RowLevelHelper.shouldFallbackToV1MergeInto`.
+ * Row-tracking-only tables use the same V2 copy-on-write rewrite. CHAR
columns are excluded —
+ * `readSidePadding` races with the rewrite and trips CheckAnalysis; those
plans fall back to the
+ * postHoc `PaimonMergeInto` V1 path, which also owns PK / DE / DV tables via
+ * `RowLevelHelper.shouldFallbackToV1MergeInto`.
*/
object Spark41MergeIntoRewrite
extends RewriteRowLevelCommand
@@ -71,7 +72,7 @@ object Spark41MergeIntoRewrite
plan.transformDown {
case m: MergeIntoTable
if m.resolved && m.rewritable && !m.needSchemaEvolution &&
- targetsPureAppendOnly(m.targetTable) =>
+ targetsV2CopyOnWriteTable(m.targetTable) =>
// Pure append-only tables skip postHoc `PaimonMergeInto`, so evolve
schema here.
val evolved = evolveSchemaIfPaimon(m)
rewrite(alignAllMergeActions(evolved, evolved.targetTable.output))
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
index d8082b592e..97edbdc780 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41UpdateTableRewrite.scala
@@ -47,8 +47,9 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* We fire before `ResolveAssignments`, so `u.aligned` is `false`; the rule
pre-aligns via
* `PaimonAssignmentUtils.alignUpdateAssignments` before building the plan.
*
- * PK tables go through the postHoc rule; RT / DE / DV tables go through
Spark's V2 path. DELETE is
- * handled by [[Spark41DeleteMetadataRestore]]; MERGE by
[[Spark41MergeIntoRewrite]].
+ * Row-tracking-only tables use the same V2 copy-on-write rewrite. PK / DE /
DV tables go through
+ * the postHoc V1 rule because they do not expose
`SupportsRowLevelOperations`. DELETE is handled by
+ * [[Spark41DeleteMetadataRestore]]; MERGE by [[Spark41MergeIntoRewrite]].
*/
object Spark41UpdateTableRewrite extends RewriteRowLevelCommand with
PureAppendOnlyScope {
@@ -57,7 +58,7 @@ object Spark41UpdateTableRewrite extends
RewriteRowLevelCommand with PureAppendO
AnalysisHelper.allowInvokingTransformsInAnalyzer {
plan.transformDown {
case u @ UpdateTable(aliasedTable, assignments, cond)
- if u.resolved && u.rewritable &&
targetsPureAppendOnly(aliasedTable) =>
+ if u.resolved && u.rewritable &&
targetsV2CopyOnWriteTable(aliasedTable) =>
EliminateSubqueryAliases(aliasedTable) match {
case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
val table = buildOperationTable(tbl, UPDATE,
CaseInsensitiveStringMap.empty())