This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 50ffca8fbc [spark] Refactor BatchWrite subclasses into base logic +
per-version wrappers (#7723)
50ffca8fbc is described below
commit 50ffca8fbc4774a933c25406e24afd272567448b
Author: Kerwin Zhang <[email protected]>
AuthorDate: Sat May 23 22:55:09 2026 +0800
[spark] Refactor BatchWrite subclasses into base logic + per-version
wrappers (#7723)
Follow-up of #7648 (Spark 4.1 module) and a sibling of #7721. After
landing the reverse-shim layout, two of the files under
`paimon-spark-4.0/src/main` only existed as shadows because their
compilation unit defined a Scala class that **`extends BatchWrite`**.
Spark 4.1 added a default method
`BatchWrite.commit(WriterCommitMessage[], WriteSummary)` whose
`WriteSummary` parameter type does not exist on Spark 4.0; a class
compiled against 4.1 that mixes in `BatchWrite` carries the inherited
`commit(.., WriteSummary)` signature in its method table, which JVM
`ObjectStreamClass.getPrivateMethod` lazy-links during Spark task
serialization and crashes 4.0 with `ClassNotFoundException:
WriteSummary`.
This PR refactors both affected classes into the same **base +
per-version wrapper** pattern:
- `PaimonBatchWrite` (used by V2 writes)
- `FormatTableBatchWrite` (used by `FormatTable` V2 writes — was
previously a `private case class` inside `PaimonFormatTable.scala`)
For each, the body lives in a new abstract base in `paimon-spark-common`
that deliberately does *not* extend `BatchWrite` (renamed protected
helpers: `commitMessages`, `abortMessages`,
`createPaimonDataWriterFactory`, `createFormatTableDataWriterFactory`).
Each per-version module (`paimon-spark3-common`, `paimon-spark4-common`,
`paimon-spark-4.0/src/main`) ships a thin wrapper that mixes in
`BatchWrite` and forwards the four `BatchWrite` methods to the base
helpers. Routing happens through two new `SparkShim` factories so each
Spark version's scalac compiles the right `extends BatchWrite` mixin.
The Spark 4.0 shadow of `PaimonFormatTable.scala` is no longer needed
and is deleted; only the new thin `FormatTableBatchWrite.scala` wrapper
remains under `paimon-spark-4.0/src/main`.
---
.../spark/format/FormatTableBatchWrite.scala | 49 ++++++++
.../paimon/spark/write/PaimonBatchWrite.scala | 139 ++++-----------------
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 20 +++
.../spark/format/FormatTableBatchWriteBase.scala} | 95 ++++----------
.../paimon/spark/format/PaimonFormatTable.scala | 107 +---------------
...BatchWrite.scala => PaimonBatchWriteBase.scala} | 70 ++++++-----
.../apache/paimon/spark/write/PaimonV2Write.scala | 8 +-
.../apache/spark/sql/paimon/shims/SparkShim.scala | 24 ++++
.../spark/format/FormatTableBatchWrite.scala | 47 +++++++
.../paimon/spark/write/PaimonBatchWrite.scala | 59 +++++++++
.../apache/spark/sql/paimon/shims/Spark3Shim.scala | 20 +++
.../spark/format/FormatTableBatchWrite.scala | 49 ++++++++
.../paimon/spark/write/PaimonBatchWrite.scala | 61 +++++++++
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 20 +++
14 files changed, 446 insertions(+), 322 deletions(-)
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
new file mode 100644
index 0000000000..c05349c504
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark 4.0-compatible shadow of the `paimon-spark4-common`
`FormatTableBatchWrite`. Compiled
+ * against 4.0.2 so its class file's method table does not carry the
`commit(.., WriteSummary)`
+ * signature added by Spark 4.1's `BatchWrite` default method, avoiding
`ClassNotFoundException:
+ * WriteSummary` lazy-linking on 4.0 task serialization.
+ */
+class FormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType)
+ extends FormatTableBatchWriteBase(table, overwriteDynamic,
overwritePartitions, writeSchema)
+ with BatchWrite
+ with Serializable {
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ createFormatTableDataWriterFactory()
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit =
commitMessages(messages)
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit =
abortMessages(messages)
+}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index bde8f028c5..c78f783924 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -18,136 +18,45 @@
package org.apache.paimon.spark.write
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
-import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.commands.SparkDataFileMeta
-import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl}
-import org.apache.spark.sql.PaimonSparkSession
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
-import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-case class PaimonBatchWrite(
+/**
+ * Spark-4.0 shadow wrapper. Source-identical to the `paimon-spark4-common`
version but compiled
+ * against Spark 4.0.2; the maven shade order picks
`paimon-spark-4.0/target/classes` ahead of the
+ * shaded 4-common copy, so the class metadata loaded at runtime does not
include the 4.1-only
+ * `BatchWrite.commit(.., WriteSummary)` signature that triggers
`ClassNotFoundException` via
+ * `ObjectStreamClass.getPrivateMethod` during Spark task serialization.
+ */
+class PaimonBatchWrite(
table: FileStoreTable,
writeSchema: StructType,
dataSchema: StructType,
overwritePartitions: Option[Map[String, String]],
copyOnWriteScan: Option[PaimonCopyOnWriteScan])
- extends BatchWrite
- with WriteHelper {
-
- protected val metricRegistry = SparkMetricRegistry()
-
- @volatile private var commitStarted: Boolean = false
+ extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions, copyOnWriteScan)
+ with BatchWrite
+ with Serializable {
- protected val batchWriteBuilder: BatchWriteBuilder = {
- val builder = table.newBatchWriteBuilder()
- overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
- builder
- }
-
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
- (_: Int, _: Long) =>
- {
- PaimonV2DataWriter(
- batchWriteBuilder,
- writeSchema,
- dataSchema,
- coreOptions,
- table.catalogEnvironment().catalogContext())
- }
- }
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ createPaimonDataWriterFactory(info)
override def useCommitCoordinator(): Boolean = false
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- commitStarted = true
- logInfo(s"Committing to table ${table.name()}")
- val batchTableCommit = batchWriteBuilder.newCommit()
- batchTableCommit.withMetricRegistry(metricRegistry)
- val addCommitMessage = WriteTaskResult.merge(messages)
- val deletedCommitMessage = copyOnWriteScan match {
- case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles)
- case None => Seq.empty
- }
- val commitMessages = addCommitMessage ++ deletedCommitMessage
- try {
- val start = System.currentTimeMillis()
- batchTableCommit.commit(commitMessages.asJava)
- logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
- } finally {
- batchTableCommit.close()
- }
- postDriverMetrics()
- postCommit(commitMessages)
- }
-
- // Spark support v2 write driver metrics since 4.0, see
https://github.com/apache/spark/pull/48573
- // To ensure compatibility with 3.x, manually post driver metrics here
instead of using Spark's API.
- protected def postDriverMetrics(): Unit = {
- val spark = PaimonSparkSession.active
- // todo: find a more suitable way to get metrics.
- val commitMetrics = metricRegistry.buildSparkCommitMetrics()
- val executionId =
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- val executionMetrics = Compatibility.getExecutionMetrics(spark,
executionId.toLong).distinct
- val metricUpdates = executionMetrics.flatMap {
- m =>
- commitMetrics.find(x =>
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
- case Some(customTaskMetric) => Some((m.accumulatorId,
customTaskMetric.value()))
- case None => None
- }
- }
- SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext,
executionId, metricUpdates)
- }
-
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
- if (commitStarted) {
- logWarning(s"Skip abort cleanup for table ${table.name()} because commit
has already started")
- return
- }
+ override def commit(messages: Array[WriterCommitMessage]): Unit =
commitMessages(messages)
- logInfo(s"Aborting write to table ${table.name()}")
- val batchTableCommit = batchWriteBuilder.newCommit()
- try {
- val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
- batchTableCommit.abort(commitMessages.asJava)
- } finally {
- batchTableCommit.close()
- }
- }
-
- private def buildDeletedCommitMessage(
- deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
- logInfo(s"[V2 Write] Building deleted commit message for
${deletedFiles.size} files")
- deletedFiles
- .groupBy(f => (f.partition, f.bucket))
- .map {
- case ((partition, bucket), files) =>
- val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+ override def abort(messages: Array[WriterCommitMessage]): Unit =
abortMessages(messages)
+}
- new CommitMessageImpl(
- partition,
- bucket,
- files.head.totalBuckets,
- new DataIncrement(
- Collections.emptyList[DataFileMeta],
- deletedDataFileMetas,
- Collections.emptyList[DataFileMeta]),
- new CompactIncrement(
- Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta])
- )
- }
- .toSeq
- }
+object PaimonBatchWrite {
+ def apply(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
+ new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index e08c87d4d3..731b655cd9 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.{GenericVariant,
Variant}
import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow,
Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.format.FormatTableBatchWrite
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.write.PaimonBatchWrite
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.hadoop.conf.Configuration
@@ -38,6 +42,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn,
IdentityColumn, ResolveDefaultColumns}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -199,6 +204,21 @@ class Spark4Shim extends SparkShim {
tableCatalog.invalidateTable(ident)
}
+ override def createPaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
+ new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
+
+ override def createFormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType): BatchWrite =
+ new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
+
override def createCTERelationRef(
cteId: Long,
resolved: Boolean,
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala
similarity index 55%
rename from
paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala
index 4f55579a68..853c7f4146 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala
@@ -18,89 +18,38 @@
package org.apache.paimon.spark.format
-import org.apache.paimon.format.csv.CsvOptions
-import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder,
SparkInternalRowWrapper}
-import org.apache.paimon.spark.write.{BaseV2WriteBuilder,
FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult}
+import org.apache.paimon.spark.SparkInternalRowWrapper
+import org.apache.paimon.spark.write.{FormatTableWriteTaskResult, V2DataWrite,
WriteTaskResult}
import org.apache.paimon.table.FormatTable
import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder,
CommitMessage}
-import org.apache.paimon.types.RowType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
TableCapability, TableCatalog}
-import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write._
-import org.apache.spark.sql.connector.write.streaming.StreamingWrite
+import org.apache.spark.sql.connector.write.{DataWriterFactory,
WriterCommitMessage}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-import java.util
-import java.util.Locale
import scala.collection.JavaConverters._
-case class PaimonFormatTable(table: FormatTable)
- extends BaseTable
- with SupportsRead
- with SupportsWrite {
-
- override def capabilities(): util.Set[TableCapability] = {
- util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC,
OVERWRITE_BY_FILTER)
- }
-
- override def properties: util.Map[String, String] = {
- val properties = new util.HashMap[String, String](table.options())
- properties.put(TableCatalog.PROP_PROVIDER,
table.format.name().toLowerCase(Locale.ROOT))
- if (table.comment.isPresent) {
- properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
- }
- if (FormatTable.Format.CSV == table.format) {
- properties.put(
- "sep",
- properties.getOrDefault(
- CsvOptions.FIELD_DELIMITER.key(),
- CsvOptions.FIELD_DELIMITER.defaultValue()))
- }
- properties
- }
-
- override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
- val scanBuilder =
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
- scanBuilder.pruneColumns(schema)
- scanBuilder
- }
-
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- PaimonFormatTableWriterBuilder(table, info.schema)
- }
-}
-
-case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
- extends BaseV2WriteBuilder(table) {
-
- override def partitionRowType(): RowType = table.partitionType
-
- override def build: Write = new Write() {
- override def toBatch: BatchWrite = {
- FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
- }
-
- override def toStreaming: StreamingWrite = {
- throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
- }
- }
-}
-
-private case class FormatTableBatchWrite(
+/**
+ * Business logic for `FormatTable` batch writes, deliberately *not* extending
+ * `org.apache.spark.sql.connector.write.BatchWrite`. See
+ * [[org.apache.paimon.spark.write.PaimonBatchWriteBase]] for the full
rationale: Spark 4.1 added a
+ * default method `BatchWrite.commit(.., WriteSummary)` whose `WriteSummary`
parameter type is
+ * unavailable on Spark 4.0, so a class compiled against 4.1 that mixes in
`BatchWrite` triggers
+ * `ClassNotFoundException: WriteSummary` lazy-linking on 4.0 runtimes during
Spark task
+ * serialization. Keeping this base off `BatchWrite` lets common ship the
implementation once;
+ * per-version `paimon-spark{3,4}-common` modules supply a thin wrapper that
mixes in `BatchWrite`,
+ * and `paimon-spark-4.0/src/main` shadows that wrapper at the 4.0.2 compile
target.
+ */
+abstract class FormatTableBatchWriteBase(
table: FormatTable,
overwriteDynamic: Option[Boolean],
overwritePartitions: Option[Map[String, String]],
writeSchema: StructType)
- extends BatchWrite
- with Logging {
+ extends Logging
+ with Serializable {
- private val batchWriteBuilder = {
+ protected val batchWriteBuilder: BatchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
// todo: add test for static overwrite the whole table
if (overwriteDynamic.contains(true)) {
@@ -111,13 +60,11 @@ private case class FormatTableBatchWrite(
builder
}
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
+ protected def createFormatTableDataWriterFactory(): DataWriterFactory = {
(_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder,
writeSchema)
}
- override def useCommitCoordinator(): Boolean = false
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = {
logInfo(s"Committing to FormatTable ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
val commitMessages = WriteTaskResult.merge(messages).asJava
@@ -132,7 +79,7 @@ private case class FormatTableBatchWrite(
}
}
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ protected def abortMessages(messages: Array[WriterCommitMessage]): Unit = {
logInfo(s"Aborting write to FormatTable ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
val commitMessages = WriteTaskResult.merge(messages).asJava
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 4f55579a68..dc7effd39f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -19,27 +19,23 @@
package org.apache.paimon.spark.format
import org.apache.paimon.format.csv.CsvOptions
-import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder,
SparkInternalRowWrapper}
-import org.apache.paimon.spark.write.{BaseV2WriteBuilder,
FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult}
+import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder}
+import org.apache.paimon.spark.write.BaseV2WriteBuilder
import org.apache.paimon.table.FormatTable
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder,
CommitMessage}
import org.apache.paimon.types.RowType
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
TableCapability, TableCatalog}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util
import java.util.Locale
-import scala.collection.JavaConverters._
-
case class PaimonFormatTable(table: FormatTable)
extends BaseTable
with SupportsRead
@@ -83,7 +79,8 @@ case class PaimonFormatTableWriterBuilder(table: FormatTable,
writeSchema: Struc
override def build: Write = new Write() {
override def toBatch: BatchWrite = {
- FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
+ SparkShimLoader.shim
+ .createFormatTableBatchWrite(table, overwriteDynamic,
overwritePartitions, writeSchema)
}
override def toStreaming: StreamingWrite = {
@@ -91,97 +88,3 @@ case class PaimonFormatTableWriterBuilder(table:
FormatTable, writeSchema: Struc
}
}
}
-
-private case class FormatTableBatchWrite(
- table: FormatTable,
- overwriteDynamic: Option[Boolean],
- overwritePartitions: Option[Map[String, String]],
- writeSchema: StructType)
- extends BatchWrite
- with Logging {
-
- private val batchWriteBuilder = {
- val builder = table.newBatchWriteBuilder()
- // todo: add test for static overwrite the whole table
- if (overwriteDynamic.contains(true)) {
- builder.withOverwrite()
- } else {
- overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
- }
- builder
- }
-
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
- (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder,
writeSchema)
- }
-
- override def useCommitCoordinator(): Boolean = false
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- logInfo(s"Committing to FormatTable ${table.name()}")
- val batchTableCommit = batchWriteBuilder.newCommit()
- val commitMessages = WriteTaskResult.merge(messages).asJava
- try {
- val start = System.currentTimeMillis()
- batchTableCommit.commit(commitMessages)
- logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
- } catch {
- case e: Exception =>
- logError("Failed to commit FormatTable writes", e)
- throw e
- }
- }
-
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
- logInfo(s"Aborting write to FormatTable ${table.name()}")
- val batchTableCommit = batchWriteBuilder.newCommit()
- val commitMessages = WriteTaskResult.merge(messages).asJava
- batchTableCommit.abort(commitMessages)
- }
-}
-
-private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder,
writeSchema: StructType)
- extends V2DataWrite
- with Logging {
-
- private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow
= {
- val numFields = writeSchema.fields.length
- record => {
- new SparkInternalRowWrapper(writeSchema, numFields).replace(record)
- }
- }
-
- private val write: BatchTableWrite = batchWriteBuilder.newWrite()
-
- override def write(record: InternalRow): Unit = {
- val paimonRow = rowConverter.apply(record)
- write.write(paimonRow)
- }
-
- override def commitImpl(): Seq[CommitMessage] = {
- write.prepareCommit().asScala.toSeq
- }
-
- def buildWriteTaskResult(commitMessages: Seq[CommitMessage]):
FormatTableWriteTaskResult = {
- FormatTableWriteTaskResult(commitMessages)
- }
-
- override def commit: FormatTableWriteTaskResult = {
- super.commit.asInstanceOf[FormatTableWriteTaskResult]
- }
-
- override def abort(): Unit = {
- logInfo("Aborting FormatTable data writer")
- close()
- }
-
- override def close(): Unit = {
- try {
- write.close()
- } catch {
- case e: Exception =>
- logError("Error closing FormatTableDataWriter", e)
- throw new RuntimeException(e)
- }
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
similarity index 76%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
index 1f2abae0b0..d19f1a7096 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
@@ -27,7 +27,7 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl}
import org.apache.spark.sql.PaimonSparkSession
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
@@ -36,18 +36,30 @@ import java.util.Collections
import scala.collection.JavaConverters._
-case class PaimonBatchWrite(
- table: FileStoreTable,
- writeSchema: StructType,
- dataSchema: StructType,
- overwritePartitions: Option[Map[String, String]],
- copyOnWriteScan: Option[PaimonCopyOnWriteScan])
- extends BatchWrite
- with WriteHelper {
+/**
+ * Business logic for Paimon batch writes, deliberately *not* extending
+ * `org.apache.spark.sql.connector.write.BatchWrite`. Spark 4.1 added a
default method
+ * `BatchWrite.commit(WriterCommitMessage[], WriteSummary)` whose
`WriteSummary` parameter type does
+ * not exist on Spark 4.0; a class compiled against 4.1 that declares `extends
BatchWrite` carries
+ * the inherited `commit(.., WriteSummary)` signature in its method table,
which JVM
+ * `ObjectStreamClass.getPrivateMethod` lazy-links during Spark task
serialization, crashing on 4.0
+ * with `ClassNotFoundException: WriteSummary`. Keeping this base off
`BatchWrite` lets common ship
+ * the implementation once; per-version `paimon-spark{3,4}-common` modules
supply a thin wrapper
+ * that mixes in `BatchWrite`, and `paimon-spark-4.0/src/main` shadows that
wrapper at the 4.0.2
+ * compile target.
+ */
+abstract class PaimonBatchWriteBase(
+ val table: FileStoreTable,
+ val writeSchema: StructType,
+ val dataSchema: StructType,
+ val overwritePartitions: Option[Map[String, String]],
+ val copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+ extends WriteHelper
+ with Serializable {
protected val metricRegistry = SparkMetricRegistry()
- @volatile private var commitStarted: Boolean = false
+ @volatile protected var commitStarted: Boolean = false
protected val batchWriteBuilder: BatchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
@@ -55,7 +67,7 @@ case class PaimonBatchWrite(
builder
}
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
+ protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
(_: Int, _: Long) =>
{
PaimonV2DataWriter(
@@ -67,9 +79,7 @@ case class PaimonBatchWrite(
}
}
- override def useCommitCoordinator(): Boolean = false
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = {
commitStarted = true
logInfo(s"Committing to table ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
@@ -91,6 +101,22 @@ case class PaimonBatchWrite(
postCommit(commitMessages)
}
+ protected def abortMessages(messages: Array[WriterCommitMessage]): Unit = {
+ if (commitStarted) {
+ logWarning(s"Skip abort cleanup for table ${table.name()} because commit
has already started")
+ return
+ }
+
+ logInfo(s"Aborting write to table ${table.name()}")
+ val batchTableCommit = batchWriteBuilder.newCommit()
+ try {
+ val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
+ batchTableCommit.abort(commitMessages.asJava)
+ } finally {
+ batchTableCommit.close()
+ }
+ }
+
// Spark support v2 write driver metrics since 4.0, see
https://github.com/apache/spark/pull/48573
// To ensure compatibility with 3.x, manually post driver metrics here
instead of using Spark's API.
protected def postDriverMetrics(): Unit = {
@@ -109,22 +135,6 @@ case class PaimonBatchWrite(
SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext,
executionId, metricUpdates)
}
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
- if (commitStarted) {
- logWarning(s"Skip abort cleanup for table ${table.name()} because commit
has already started")
- return
- }
-
- logInfo(s"Aborting write to table ${table.name()}")
- val batchTableCommit = batchWriteBuilder.newCommit()
- try {
- val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
- batchTableCommit.abort(commitMessages.asJava)
- } finally {
- batchTableCommit.close()
- }
- }
-
private def buildDeletedCommitMessage(
deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
logInfo(s"[V2 Write] Building deleted commit message for
${deletedFiles.size} files")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index e4676ae4af..c7a9a9ff3a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -31,6 +31,7 @@ import
org.apache.spark.sql.connector.distributions.Distribution
import org.apache.spark.sql.connector.expressions.SortOrder
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.types.StructType
import scala.collection.mutable
@@ -62,7 +63,12 @@ class PaimonV2Write(
}
override def toBatch: BatchWrite = {
- PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
+ SparkShimLoader.shim.createPaimonBatchWrite(
+ table,
+ writeSchema,
+ dataSchema,
+ overwritePartitions,
+ copyOnWriteScan)
}
override def supportedCustomMetrics(): Array[CustomMetric] = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 38efd8c006..b8b26237f0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.paimon.shims
import org.apache.paimon.data.variant.Variant
import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.SparkSession
@@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.connector.catalog.{Column, Identifier,
StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.StructType
@@ -112,6 +115,27 @@ trait SparkShim {
additionalProperties: Map[String, String],
location: Option[String]): TableSpec
+ /**
+ * Constructs a `BatchWrite` for Paimon's V2 write path. The implementation
lives in each
+ * per-version shim module so the `extends BatchWrite` mixin is compiled
against the right Spark
+ * minor version: Spark 4.1 added a default method `BatchWrite.commit(..,
WriteSummary)` whose
+ * inherited signature triggers `ClassNotFoundException: WriteSummary`
lazy-linking on Spark 4.0
+ * runtimes when the class is loaded for task serialization.
+ */
+ def createPaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite
+
+ /** Same `BatchWrite` mixin problem as [[createPaimonBatchWrite]], but for
`FormatTable` writes. */
+ def createFormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType): BatchWrite
+
def createCTERelationRef(
cteId: Long,
resolved: Boolean,
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
new file mode 100644
index 0000000000..d13c737370
--- /dev/null
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-3.x thin wrapper that mixes `BatchWrite` into
[[FormatTableBatchWriteBase]]. See the base
+ * class scaladoc for why the inheritance lives here rather than in
`paimon-spark-common`.
+ */
+class FormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType)
+ extends FormatTableBatchWriteBase(table, overwriteDynamic,
overwritePartitions, writeSchema)
+ with BatchWrite
+ with Serializable {
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ createFormatTableDataWriterFactory()
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit =
commitMessages(messages)
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit =
abortMessages(messages)
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
new file mode 100644
index 0000000000..89eb15054c
--- /dev/null
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-3.x thin wrapper that mixes `BatchWrite` into
[[PaimonBatchWriteBase]]. See the base class
+ * scaladoc for why the inheritance lives here rather than in
`paimon-spark-common`.
+ */
+class PaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+ extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions, copyOnWriteScan)
+ with BatchWrite
+ with Serializable {
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ createPaimonDataWriterFactory(info)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit =
commitMessages(messages)
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit =
abortMessages(messages)
+}
+
+object PaimonBatchWrite {
+ def apply(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
+ new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 1798b12410..682dc89014 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.Variant
import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow,
Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.format.FormatTableBatchWrite
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.write.PaimonBatchWrite
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.hadoop.conf.Configuration
@@ -42,6 +46,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData,
GeneratedColumn, ResolveDe
import org.apache.spark.sql.connector.catalog.{Column, Identifier,
StagingTableCatalog, Table, TableCatalog}
import
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -185,6 +190,21 @@ class Spark3Shim extends SparkShim {
tableCatalog.invalidateTable(ident)
}
+ override def createPaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
+ new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
+
+ override def createFormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType): BatchWrite =
+ new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
+
override def createCTERelationRef(
cteId: Long,
resolved: Boolean,
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
new file mode 100644
index 0000000000..4fa58c95a7
--- /dev/null
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-4.x thin wrapper that mixes `BatchWrite` into
[[FormatTableBatchWriteBase]]. See the base
+ * class scaladoc for why the inheritance lives here rather than in
`paimon-spark-common`. A
+ * duplicate of this file lives at `paimon-spark-4.0/src/main` so that a 4.0.2
compile target
+ * produces a class file whose method table does not reference `WriteSummary`
(4.1-only).
+ */
+class FormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType)
+ extends FormatTableBatchWriteBase(table, overwriteDynamic,
overwritePartitions, writeSchema)
+ with BatchWrite
+ with Serializable {
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ createFormatTableDataWriterFactory()
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit =
commitMessages(messages)
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit =
abortMessages(messages)
+}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
new file mode 100644
index 0000000000..38683e79c0
--- /dev/null
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-4.x thin wrapper that mixes `BatchWrite` into
[[PaimonBatchWriteBase]]. See the base class
+ * scaladoc for why the inheritance lives here rather than in
`paimon-spark-common`. A duplicate of
+ * this file lives at `paimon-spark-4.0/src/main` so that a 4.0.2 compile
target produces a class
+ * file whose method table does not reference `WriteSummary` (4.1-only).
+ */
+class PaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+ extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions, copyOnWriteScan)
+ with BatchWrite
+ with Serializable {
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ createPaimonDataWriterFactory(info)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit =
commitMessages(messages)
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit =
abortMessages(messages)
+}
+
+object PaimonBatchWrite {
+ def apply(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
+ new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
+}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 516c02a09f..b87041d3dc 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.{GenericVariant,
Variant}
import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow,
Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.format.FormatTableBatchWrite
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.write.PaimonBatchWrite
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
import org.apache.paimon.types.{DataType, RowType}
import org.apache.hadoop.conf.Configuration
@@ -38,6 +42,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn,
IdentityColumn, ResolveDefaultColumns}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column,
Identifier, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex,
PartitionSpec}
import
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec,
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -183,6 +188,21 @@ class Spark4Shim extends SparkShim {
tableCatalog.invalidateTable(ident)
}
+ override def createPaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
+ new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
+
+ override def createFormatTableBatchWrite(
+ table: FormatTable,
+ overwriteDynamic: Option[Boolean],
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType): BatchWrite =
+ new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions,
writeSchema)
+
override def createCTERelationRef(
cteId: Long,
resolved: Boolean,