This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ffca8fbc [spark] Refactor BatchWrite subclasses into base logic + 
per-version wrappers (#7723)
50ffca8fbc is described below

commit 50ffca8fbc4774a933c25406e24afd272567448b
Author: Kerwin Zhang <[email protected]>
AuthorDate: Sat May 23 22:55:09 2026 +0800

    [spark] Refactor BatchWrite subclasses into base logic + per-version 
wrappers (#7723)
    
    Follow-up of #7648 (Spark 4.1 module) and a sibling of #7721. After
    landing the reverse-shim layout, two of the files under
    `paimon-spark-4.0/src/main` only existed as shadows because their
    compilation unit defined a Scala class that **`extends BatchWrite`**.
    Spark 4.1 added a default method
    `BatchWrite.commit(WriterCommitMessage[], WriteSummary)` whose
    `WriteSummary` parameter type does not exist on Spark 4.0; a class
    compiled against 4.1 that mixes in `BatchWrite` carries the inherited
    `commit(.., WriteSummary)` signature in its method table, which JVM
    `ObjectStreamClass.getPrivateMethod` lazy-links during Spark task
    serialization and crashes 4.0 with `ClassNotFoundException:
    WriteSummary`.
    
    This PR refactors both affected classes into the same **base +
    per-version wrapper** pattern:
    
    - `PaimonBatchWrite` (used by V2 writes)
    - `FormatTableBatchWrite` (used by `FormatTable` V2 writes — was
    previously a `private case class` inside `PaimonFormatTable.scala`)
    
    For each, the body lives in a new abstract base in `paimon-spark-common`
    that deliberately does *not* extend `BatchWrite` (renamed protected
    helpers: `commitMessages`, `abortMessages`,
    `createPaimonDataWriterFactory`, `createFormatTableDataWriterFactory`).
    Each per-version module (`paimon-spark3-common`, `paimon-spark4-common`,
    `paimon-spark-4.0/src/main`) ships a thin wrapper that mixes in
    `BatchWrite` and forwards the four `BatchWrite` methods to the base
    helpers. Routing happens through two new `SparkShim` factories so each
    Spark version's scalac compiles the right `extends BatchWrite` mixin.
    
    The Spark 4.0 shadow of `PaimonFormatTable.scala` is no longer needed
    and is deleted; only the new thin `FormatTableBatchWrite.scala` wrapper
    remains under `paimon-spark-4.0/src/main`.
---
 .../spark/format/FormatTableBatchWrite.scala       |  49 ++++++++
 .../paimon/spark/write/PaimonBatchWrite.scala      | 139 ++++-----------------
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |  20 +++
 .../spark/format/FormatTableBatchWriteBase.scala}  |  95 ++++----------
 .../paimon/spark/format/PaimonFormatTable.scala    | 107 +---------------
 ...BatchWrite.scala => PaimonBatchWriteBase.scala} |  70 ++++++-----
 .../apache/paimon/spark/write/PaimonV2Write.scala  |   8 +-
 .../apache/spark/sql/paimon/shims/SparkShim.scala  |  24 ++++
 .../spark/format/FormatTableBatchWrite.scala       |  47 +++++++
 .../paimon/spark/write/PaimonBatchWrite.scala      |  59 +++++++++
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala |  20 +++
 .../spark/format/FormatTableBatchWrite.scala       |  49 ++++++++
 .../paimon/spark/write/PaimonBatchWrite.scala      |  61 +++++++++
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |  20 +++
 14 files changed, 446 insertions(+), 322 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
new file mode 100644
index 0000000000..c05349c504
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark 4.0-compatible shadow of the `paimon-spark4-common` 
`FormatTableBatchWrite`. Compiled
+ * against 4.0.2 so its class file's method table does not carry the 
`commit(.., WriteSummary)`
+ * signature added by Spark 4.1's `BatchWrite` default method, avoiding 
`ClassNotFoundException:
+ * WriteSummary` lazy-linking on 4.0 task serialization.
+ */
+class FormatTableBatchWrite(
+    table: FormatTable,
+    overwriteDynamic: Option[Boolean],
+    overwritePartitions: Option[Map[String, String]],
+    writeSchema: StructType)
+  extends FormatTableBatchWriteBase(table, overwriteDynamic, 
overwritePartitions, writeSchema)
+  with BatchWrite
+  with Serializable {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    createFormatTableDataWriterFactory()
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = 
commitMessages(messages)
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = 
abortMessages(messages)
+}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index bde8f028c5..c78f783924 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -18,136 +18,45 @@
 
 package org.apache.paimon.spark.write
 
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
-import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.commands.SparkDataFileMeta
-import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl}
 
-import org.apache.spark.sql.PaimonSparkSession
 import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
-import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-case class PaimonBatchWrite(
+/**
+ * Spark-4.0 shadow wrapper. Source-identical to the `paimon-spark4-common` 
version but compiled
+ * against Spark 4.0.2; the maven shade order picks 
`paimon-spark-4.0/target/classes` ahead of the
+ * shaded 4-common copy, so the class metadata loaded at runtime does not 
include the 4.1-only
+ * `BatchWrite.commit(.., WriteSummary)` signature that triggers 
`ClassNotFoundException` via
+ * `ObjectStreamClass.getPrivateMethod` during Spark task serialization.
+ */
+class PaimonBatchWrite(
     table: FileStoreTable,
     writeSchema: StructType,
     dataSchema: StructType,
     overwritePartitions: Option[Map[String, String]],
     copyOnWriteScan: Option[PaimonCopyOnWriteScan])
-  extends BatchWrite
-  with WriteHelper {
-
-  protected val metricRegistry = SparkMetricRegistry()
-
-  @volatile private var commitStarted: Boolean = false
+  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions, copyOnWriteScan)
+  with BatchWrite
+  with Serializable {
 
-  protected val batchWriteBuilder: BatchWriteBuilder = {
-    val builder = table.newBatchWriteBuilder()
-    overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
-    builder
-  }
-
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
-    (_: Int, _: Long) =>
-      {
-        PaimonV2DataWriter(
-          batchWriteBuilder,
-          writeSchema,
-          dataSchema,
-          coreOptions,
-          table.catalogEnvironment().catalogContext())
-      }
-  }
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    createPaimonDataWriterFactory(info)
 
   override def useCommitCoordinator(): Boolean = false
 
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    commitStarted = true
-    logInfo(s"Committing to table ${table.name()}")
-    val batchTableCommit = batchWriteBuilder.newCommit()
-    batchTableCommit.withMetricRegistry(metricRegistry)
-    val addCommitMessage = WriteTaskResult.merge(messages)
-    val deletedCommitMessage = copyOnWriteScan match {
-      case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles)
-      case None => Seq.empty
-    }
-    val commitMessages = addCommitMessage ++ deletedCommitMessage
-    try {
-      val start = System.currentTimeMillis()
-      batchTableCommit.commit(commitMessages.asJava)
-      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
-    } finally {
-      batchTableCommit.close()
-    }
-    postDriverMetrics()
-    postCommit(commitMessages)
-  }
-
-  // Spark support v2 write driver metrics since 4.0, see 
https://github.com/apache/spark/pull/48573
-  // To ensure compatibility with 3.x, manually post driver metrics here 
instead of using Spark's API.
-  protected def postDriverMetrics(): Unit = {
-    val spark = PaimonSparkSession.active
-    // todo: find a more suitable way to get metrics.
-    val commitMetrics = metricRegistry.buildSparkCommitMetrics()
-    val executionId = 
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    val executionMetrics = Compatibility.getExecutionMetrics(spark, 
executionId.toLong).distinct
-    val metricUpdates = executionMetrics.flatMap {
-      m =>
-        commitMetrics.find(x => 
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
-          case Some(customTaskMetric) => Some((m.accumulatorId, 
customTaskMetric.value()))
-          case None => None
-        }
-    }
-    SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, 
executionId, metricUpdates)
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    if (commitStarted) {
-      logWarning(s"Skip abort cleanup for table ${table.name()} because commit 
has already started")
-      return
-    }
+  override def commit(messages: Array[WriterCommitMessage]): Unit = 
commitMessages(messages)
 
-    logInfo(s"Aborting write to table ${table.name()}")
-    val batchTableCommit = batchWriteBuilder.newCommit()
-    try {
-      val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
-      batchTableCommit.abort(commitMessages.asJava)
-    } finally {
-      batchTableCommit.close()
-    }
-  }
-
-  private def buildDeletedCommitMessage(
-      deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
-    logInfo(s"[V2 Write] Building deleted commit message for 
${deletedFiles.size} files")
-    deletedFiles
-      .groupBy(f => (f.partition, f.bucket))
-      .map {
-        case ((partition, bucket), files) =>
-          val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+  override def abort(messages: Array[WriterCommitMessage]): Unit = 
abortMessages(messages)
+}
 
-          new CommitMessageImpl(
-            partition,
-            bucket,
-            files.head.totalBuckets,
-            new DataIncrement(
-              Collections.emptyList[DataFileMeta],
-              deletedDataFileMetas,
-              Collections.emptyList[DataFileMeta]),
-            new CompactIncrement(
-              Collections.emptyList[DataFileMeta],
-              Collections.emptyList[DataFileMeta],
-              Collections.emptyList[DataFileMeta])
-          )
-      }
-      .toSeq
-  }
+object PaimonBatchWrite {
+  def apply(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
+    new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
 }
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index e08c87d4d3..731b655cd9 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.{GenericVariant, 
Variant}
 import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
 import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, 
Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.format.FormatTableBatchWrite
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.write.PaimonBatchWrite
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
 import org.apache.paimon.types.{DataType, RowType}
 
 import org.apache.hadoop.conf.Configuration
@@ -38,6 +42,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
 import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
 import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -199,6 +204,21 @@ class Spark4Shim extends SparkShim {
     tableCatalog.invalidateTable(ident)
   }
 
+  override def createPaimonBatchWrite(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
+    new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
+
+  override def createFormatTableBatchWrite(
+      table: FormatTable,
+      overwriteDynamic: Option[Boolean],
+      overwritePartitions: Option[Map[String, String]],
+      writeSchema: StructType): BatchWrite =
+    new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
+
   override def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala
similarity index 55%
rename from 
paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala
index 4f55579a68..853c7f4146 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala
@@ -18,89 +18,38 @@
 
 package org.apache.paimon.spark.format
 
-import org.apache.paimon.format.csv.CsvOptions
-import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder, 
SparkInternalRowWrapper}
-import org.apache.paimon.spark.write.{BaseV2WriteBuilder, 
FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult}
+import org.apache.paimon.spark.SparkInternalRowWrapper
+import org.apache.paimon.spark.write.{FormatTableWriteTaskResult, V2DataWrite, 
WriteTaskResult}
 import org.apache.paimon.table.FormatTable
 import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessage}
-import org.apache.paimon.types.RowType
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
TableCapability, TableCatalog}
-import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, 
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write._
-import org.apache.spark.sql.connector.write.streaming.StreamingWrite
+import org.apache.spark.sql.connector.write.{DataWriterFactory, 
WriterCommitMessage}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-import java.util
-import java.util.Locale
 
 import scala.collection.JavaConverters._
 
-case class PaimonFormatTable(table: FormatTable)
-  extends BaseTable
-  with SupportsRead
-  with SupportsWrite {
-
-  override def capabilities(): util.Set[TableCapability] = {
-    util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC, 
OVERWRITE_BY_FILTER)
-  }
-
-  override def properties: util.Map[String, String] = {
-    val properties = new util.HashMap[String, String](table.options())
-    properties.put(TableCatalog.PROP_PROVIDER, 
table.format.name().toLowerCase(Locale.ROOT))
-    if (table.comment.isPresent) {
-      properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
-    }
-    if (FormatTable.Format.CSV == table.format) {
-      properties.put(
-        "sep",
-        properties.getOrDefault(
-          CsvOptions.FIELD_DELIMITER.key(),
-          CsvOptions.FIELD_DELIMITER.defaultValue()))
-    }
-    properties
-  }
-
-  override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
-    val scanBuilder = 
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
-    scanBuilder.pruneColumns(schema)
-    scanBuilder
-  }
-
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    PaimonFormatTableWriterBuilder(table, info.schema)
-  }
-}
-
-case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: 
StructType)
-  extends BaseV2WriteBuilder(table) {
-
-  override def partitionRowType(): RowType = table.partitionType
-
-  override def build: Write = new Write() {
-    override def toBatch: BatchWrite = {
-      FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
-    }
-
-    override def toStreaming: StreamingWrite = {
-      throw new UnsupportedOperationException("FormatTable doesn't support 
streaming write")
-    }
-  }
-}
-
-private case class FormatTableBatchWrite(
+/**
+ * Business logic for `FormatTable` batch writes, deliberately *not* extending
+ * `org.apache.spark.sql.connector.write.BatchWrite`. See
+ * [[org.apache.paimon.spark.write.PaimonBatchWriteBase]] for the full 
rationale: Spark 4.1 added a
+ * default method `BatchWrite.commit(.., WriteSummary)` whose `WriteSummary` 
parameter type is
+ * unavailable on Spark 4.0, so a class compiled against 4.1 that mixes in 
`BatchWrite` triggers
+ * `ClassNotFoundException: WriteSummary` lazy-linking on 4.0 runtimes during 
Spark task
+ * serialization. Keeping this base off `BatchWrite` lets common ship the 
implementation once;
+ * per-version `paimon-spark{3,4}-common` modules supply a thin wrapper that 
mixes in `BatchWrite`,
+ * and `paimon-spark-4.0/src/main` shadows that wrapper at the 4.0.2 compile 
target.
+ */
+abstract class FormatTableBatchWriteBase(
     table: FormatTable,
     overwriteDynamic: Option[Boolean],
     overwritePartitions: Option[Map[String, String]],
     writeSchema: StructType)
-  extends BatchWrite
-  with Logging {
+  extends Logging
+  with Serializable {
 
-  private val batchWriteBuilder = {
+  protected val batchWriteBuilder: BatchWriteBuilder = {
     val builder = table.newBatchWriteBuilder()
     // todo: add test for static overwrite the whole table
     if (overwriteDynamic.contains(true)) {
@@ -111,13 +60,11 @@ private case class FormatTableBatchWrite(
     builder
   }
 
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
+  protected def createFormatTableDataWriterFactory(): DataWriterFactory = {
     (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder, 
writeSchema)
   }
 
-  override def useCommitCoordinator(): Boolean = false
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = {
     logInfo(s"Committing to FormatTable ${table.name()}")
     val batchTableCommit = batchWriteBuilder.newCommit()
     val commitMessages = WriteTaskResult.merge(messages).asJava
@@ -132,7 +79,7 @@ private case class FormatTableBatchWrite(
     }
   }
 
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+  protected def abortMessages(messages: Array[WriterCommitMessage]): Unit = {
     logInfo(s"Aborting write to FormatTable ${table.name()}")
     val batchTableCommit = batchWriteBuilder.newCommit()
     val commitMessages = WriteTaskResult.merge(messages).asJava
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 4f55579a68..dc7effd39f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -19,27 +19,23 @@
 package org.apache.paimon.spark.format
 
 import org.apache.paimon.format.csv.CsvOptions
-import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder, 
SparkInternalRowWrapper}
-import org.apache.paimon.spark.write.{BaseV2WriteBuilder, 
FormatTableWriteTaskResult, V2DataWrite, WriteTaskResult}
+import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder}
+import org.apache.paimon.spark.write.BaseV2WriteBuilder
 import org.apache.paimon.table.FormatTable
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessage}
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, 
TableCapability, TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, 
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import java.util
 import java.util.Locale
 
-import scala.collection.JavaConverters._
-
 case class PaimonFormatTable(table: FormatTable)
   extends BaseTable
   with SupportsRead
@@ -83,7 +79,8 @@ case class PaimonFormatTableWriterBuilder(table: FormatTable, 
writeSchema: Struc
 
   override def build: Write = new Write() {
     override def toBatch: BatchWrite = {
-      FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
+      SparkShimLoader.shim
+        .createFormatTableBatchWrite(table, overwriteDynamic, 
overwritePartitions, writeSchema)
     }
 
     override def toStreaming: StreamingWrite = {
@@ -91,97 +88,3 @@ case class PaimonFormatTableWriterBuilder(table: 
FormatTable, writeSchema: Struc
     }
   }
 }
-
-private case class FormatTableBatchWrite(
-    table: FormatTable,
-    overwriteDynamic: Option[Boolean],
-    overwritePartitions: Option[Map[String, String]],
-    writeSchema: StructType)
-  extends BatchWrite
-  with Logging {
-
-  private val batchWriteBuilder = {
-    val builder = table.newBatchWriteBuilder()
-    // todo: add test for static overwrite the whole table
-    if (overwriteDynamic.contains(true)) {
-      builder.withOverwrite()
-    } else {
-      overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
-    }
-    builder
-  }
-
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
-    (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder, 
writeSchema)
-  }
-
-  override def useCommitCoordinator(): Boolean = false
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    logInfo(s"Committing to FormatTable ${table.name()}")
-    val batchTableCommit = batchWriteBuilder.newCommit()
-    val commitMessages = WriteTaskResult.merge(messages).asJava
-    try {
-      val start = System.currentTimeMillis()
-      batchTableCommit.commit(commitMessages)
-      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
-    } catch {
-      case e: Exception =>
-        logError("Failed to commit FormatTable writes", e)
-        throw e
-    }
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    logInfo(s"Aborting write to FormatTable ${table.name()}")
-    val batchTableCommit = batchWriteBuilder.newCommit()
-    val commitMessages = WriteTaskResult.merge(messages).asJava
-    batchTableCommit.abort(commitMessages)
-  }
-}
-
-private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, 
writeSchema: StructType)
-  extends V2DataWrite
-  with Logging {
-
-  private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow 
= {
-    val numFields = writeSchema.fields.length
-    record => {
-      new SparkInternalRowWrapper(writeSchema, numFields).replace(record)
-    }
-  }
-
-  private val write: BatchTableWrite = batchWriteBuilder.newWrite()
-
-  override def write(record: InternalRow): Unit = {
-    val paimonRow = rowConverter.apply(record)
-    write.write(paimonRow)
-  }
-
-  override def commitImpl(): Seq[CommitMessage] = {
-    write.prepareCommit().asScala.toSeq
-  }
-
-  def buildWriteTaskResult(commitMessages: Seq[CommitMessage]): 
FormatTableWriteTaskResult = {
-    FormatTableWriteTaskResult(commitMessages)
-  }
-
-  override def commit: FormatTableWriteTaskResult = {
-    super.commit.asInstanceOf[FormatTableWriteTaskResult]
-  }
-
-  override def abort(): Unit = {
-    logInfo("Aborting FormatTable data writer")
-    close()
-  }
-
-  override def close(): Unit = {
-    try {
-      write.close()
-    } catch {
-      case e: Exception =>
-        logError("Error closing FormatTableDataWriter", e)
-        throw new RuntimeException(e)
-    }
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
similarity index 76%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
index 1f2abae0b0..d19f1a7096 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala
@@ -27,7 +27,7 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl}
 
 import org.apache.spark.sql.PaimonSparkSession
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
@@ -36,18 +36,30 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-case class PaimonBatchWrite(
-    table: FileStoreTable,
-    writeSchema: StructType,
-    dataSchema: StructType,
-    overwritePartitions: Option[Map[String, String]],
-    copyOnWriteScan: Option[PaimonCopyOnWriteScan])
-  extends BatchWrite
-  with WriteHelper {
+/**
+ * Business logic for Paimon batch writes, deliberately *not* extending
+ * `org.apache.spark.sql.connector.write.BatchWrite`. Spark 4.1 added a 
default method
+ * `BatchWrite.commit(WriterCommitMessage[], WriteSummary)` whose 
`WriteSummary` parameter type does
+ * not exist on Spark 4.0; a class compiled against 4.1 that declares `extends 
BatchWrite` carries
+ * the inherited `commit(.., WriteSummary)` signature in its method table, 
which JVM
+ * `ObjectStreamClass.getPrivateMethod` lazy-links during Spark task 
serialization, crashing on 4.0
+ * with `ClassNotFoundException: WriteSummary`. Keeping this base off 
`BatchWrite` lets common ship
+ * the implementation once; per-version `paimon-spark{3,4}-common` modules 
supply a thin wrapper
+ * that mixes in `BatchWrite`, and `paimon-spark-4.0/src/main` shadows that 
wrapper at the 4.0.2
+ * compile target.
+ */
+abstract class PaimonBatchWriteBase(
+    val table: FileStoreTable,
+    val writeSchema: StructType,
+    val dataSchema: StructType,
+    val overwritePartitions: Option[Map[String, String]],
+    val copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+  extends WriteHelper
+  with Serializable {
 
   protected val metricRegistry = SparkMetricRegistry()
 
-  @volatile private var commitStarted: Boolean = false
+  @volatile protected var commitStarted: Boolean = false
 
   protected val batchWriteBuilder: BatchWriteBuilder = {
     val builder = table.newBatchWriteBuilder()
@@ -55,7 +67,7 @@ case class PaimonBatchWrite(
     builder
   }
 
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
+  protected def createPaimonDataWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
     (_: Int, _: Long) =>
       {
         PaimonV2DataWriter(
@@ -67,9 +79,7 @@ case class PaimonBatchWrite(
       }
   }
 
-  override def useCommitCoordinator(): Boolean = false
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+  protected def commitMessages(messages: Array[WriterCommitMessage]): Unit = {
     commitStarted = true
     logInfo(s"Committing to table ${table.name()}")
     val batchTableCommit = batchWriteBuilder.newCommit()
@@ -91,6 +101,22 @@ case class PaimonBatchWrite(
     postCommit(commitMessages)
   }
 
+  protected def abortMessages(messages: Array[WriterCommitMessage]): Unit = {
+    if (commitStarted) {
+      logWarning(s"Skip abort cleanup for table ${table.name()} because commit 
has already started")
+      return
+    }
+
+    logInfo(s"Aborting write to table ${table.name()}")
+    val batchTableCommit = batchWriteBuilder.newCommit()
+    try {
+      val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
+      batchTableCommit.abort(commitMessages.asJava)
+    } finally {
+      batchTableCommit.close()
+    }
+  }
+
   // Spark support v2 write driver metrics since 4.0, see 
https://github.com/apache/spark/pull/48573
   // To ensure compatibility with 3.x, manually post driver metrics here 
instead of using Spark's API.
   protected def postDriverMetrics(): Unit = {
@@ -109,22 +135,6 @@ case class PaimonBatchWrite(
     SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, 
executionId, metricUpdates)
   }
 
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    if (commitStarted) {
-      logWarning(s"Skip abort cleanup for table ${table.name()} because commit 
has already started")
-      return
-    }
-
-    logInfo(s"Aborting write to table ${table.name()}")
-    val batchTableCommit = batchWriteBuilder.newCommit()
-    try {
-      val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
-      batchTableCommit.abort(commitMessages.asJava)
-    } finally {
-      batchTableCommit.close()
-    }
-  }
-
   private def buildDeletedCommitMessage(
       deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
     logInfo(s"[V2 Write] Building deleted commit message for 
${deletedFiles.size} files")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index e4676ae4af..c7a9a9ff3a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.connector.distributions.Distribution
 import org.apache.spark.sql.connector.expressions.SortOrder
 import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.types.StructType
 
 import scala.collection.mutable
@@ -62,7 +63,12 @@ class PaimonV2Write(
   }
 
   override def toBatch: BatchWrite = {
-    PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
+    SparkShimLoader.shim.createPaimonBatchWrite(
+      table,
+      writeSchema,
+      dataSchema,
+      overwritePartitions,
+      copyOnWriteScan)
   }
 
   override def supportedCustomMetrics(): Array[CustomMetric] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 38efd8c006..b8b26237f0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.paimon.shims
 
 import org.apache.paimon.data.variant.Variant
 import org.apache.paimon.spark.data.{SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
 import org.apache.paimon.types.{DataType, RowType}
 
 import org.apache.spark.sql.SparkSession
@@ -32,6 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.StructType
@@ -112,6 +115,27 @@ trait SparkShim {
       additionalProperties: Map[String, String],
       location: Option[String]): TableSpec
 
+  /**
+   * Constructs a `BatchWrite` for Paimon's V2 write path. The implementation 
lives in each
+   * per-version shim module so the `extends BatchWrite` mixin is compiled 
against the right Spark
+   * minor version: Spark 4.1 added a default method `BatchWrite.commit(.., 
WriteSummary)` whose
+   * inherited signature triggers `ClassNotFoundException: WriteSummary` 
lazy-linking on Spark 4.0
+   * runtimes when the class is loaded for task serialization.
+   */
+  def createPaimonBatchWrite(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite
+
+  /** Same `BatchWrite` mixin problem as [[createPaimonBatchWrite]], but for 
`FormatTable` writes. */
+  def createFormatTableBatchWrite(
+      table: FormatTable,
+      overwriteDynamic: Option[Boolean],
+      overwritePartitions: Option[Map[String, String]],
+      writeSchema: StructType): BatchWrite
+
   def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
new file mode 100644
index 0000000000..d13c737370
--- /dev/null
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-3.x thin wrapper that mixes `BatchWrite` into 
[[FormatTableBatchWriteBase]]. See the base
+ * class scaladoc for why the inheritance lives here rather than in 
`paimon-spark-common`.
+ */
+class FormatTableBatchWrite(
+    table: FormatTable,
+    overwriteDynamic: Option[Boolean],
+    overwritePartitions: Option[Map[String, String]],
+    writeSchema: StructType)
+  extends FormatTableBatchWriteBase(table, overwriteDynamic, 
overwritePartitions, writeSchema)
+  with BatchWrite
+  with Serializable {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    createFormatTableDataWriterFactory()
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = 
commitMessages(messages)
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = 
abortMessages(messages)
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
new file mode 100644
index 0000000000..89eb15054c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-3.x thin wrapper that mixes `BatchWrite` into 
[[PaimonBatchWriteBase]]. See the base class
+ * scaladoc for why the inheritance lives here rather than in 
`paimon-spark-common`.
+ */
+class PaimonBatchWrite(
+    table: FileStoreTable,
+    writeSchema: StructType,
+    dataSchema: StructType,
+    overwritePartitions: Option[Map[String, String]],
+    copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions, copyOnWriteScan)
+  with BatchWrite
+  with Serializable {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    createPaimonDataWriterFactory(info)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = 
commitMessages(messages)
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = 
abortMessages(messages)
+}
+
+object PaimonBatchWrite {
+  def apply(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
+    new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 1798b12410..682dc89014 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.Variant
 import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
 import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, 
Spark3InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.format.FormatTableBatchWrite
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.write.PaimonBatchWrite
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
 import org.apache.paimon.types.{DataType, RowType}
 
 import org.apache.hadoop.conf.Configuration
@@ -42,6 +46,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, 
GeneratedColumn, ResolveDe
 import org.apache.spark.sql.connector.catalog.{Column, Identifier, 
StagingTableCatalog, Table, TableCatalog}
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Columns
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
 import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
 import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -185,6 +190,21 @@ class Spark3Shim extends SparkShim {
     tableCatalog.invalidateTable(ident)
   }
 
+  override def createPaimonBatchWrite(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
+    new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
+
+  override def createFormatTableBatchWrite(
+      table: FormatTable,
+      overwriteDynamic: Option[Boolean],
+      overwritePartitions: Option[Map[String, String]],
+      writeSchema: StructType): BatchWrite =
+    new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
+
   override def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
new file mode 100644
index 0000000000..4fa58c95a7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.format
+
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-4.x thin wrapper that mixes `BatchWrite` into 
[[FormatTableBatchWriteBase]]. See the base
+ * class scaladoc for why the inheritance lives here rather than in 
`paimon-spark-common`. A
+ * duplicate of this file lives at `paimon-spark-4.0/src/main` so that a 4.0.2 
compile target
+ * produces a class file whose method table does not reference `WriteSummary` 
(4.1-only).
+ */
+class FormatTableBatchWrite(
+    table: FormatTable,
+    overwriteDynamic: Option[Boolean],
+    overwritePartitions: Option[Map[String, String]],
+    writeSchema: StructType)
+  extends FormatTableBatchWriteBase(table, overwriteDynamic, 
overwritePartitions, writeSchema)
+  with BatchWrite
+  with Serializable {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    createFormatTableDataWriterFactory()
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = 
commitMessages(messages)
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = 
abortMessages(messages)
+}
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
new file mode 100644
index 0000000000..38683e79c0
--- /dev/null
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Spark-4.x thin wrapper that mixes `BatchWrite` into 
[[PaimonBatchWriteBase]]. See the base class
+ * scaladoc for why the inheritance lives here rather than in 
`paimon-spark-common`. A duplicate of
+ * this file lives at `paimon-spark-4.0/src/main` so that a 4.0.2 compile 
target produces a class
+ * file whose method table does not reference `WriteSummary` (4.1-only).
+ */
+class PaimonBatchWrite(
+    table: FileStoreTable,
+    writeSchema: StructType,
+    dataSchema: StructType,
+    overwritePartitions: Option[Map[String, String]],
+    copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions, copyOnWriteScan)
+  with BatchWrite
+  with Serializable {
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+    createPaimonDataWriterFactory(info)
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = 
commitMessages(messages)
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = 
abortMessages(messages)
+}
+
+object PaimonBatchWrite {
+  def apply(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
+    new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
+}
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 516c02a09f..b87041d3dc 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -22,6 +22,10 @@ import org.apache.paimon.data.variant.{GenericVariant, 
Variant}
 import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
 import 
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
 import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, 
Spark4InternalRowWithBlob, SparkArrayData, SparkInternalRow}
+import org.apache.paimon.spark.format.FormatTableBatchWrite
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.write.PaimonBatchWrite
+import org.apache.paimon.table.{FileStoreTable, FormatTable}
 import org.apache.paimon.types.{DataType, RowType}
 
 import org.apache.hadoop.conf.Configuration
@@ -38,6 +42,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{ArrayData, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
Identifier, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.connector.write.BatchWrite
 import org.apache.spark.sql.execution.{SparkFormatTable, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
 import 
org.apache.spark.sql.execution.datasources.v2.{AtomicReplaceTableAsSelectExec, 
AtomicReplaceTableExec, ReplaceTableAsSelectExec, ReplaceTableExec}
@@ -183,6 +188,21 @@ class Spark4Shim extends SparkShim {
     tableCatalog.invalidateTable(ident)
   }
 
+  override def createPaimonBatchWrite(
+      table: FileStoreTable,
+      writeSchema: StructType,
+      dataSchema: StructType,
+      overwritePartitions: Option[Map[String, String]],
+      copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
+    new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
+
+  override def createFormatTableBatchWrite(
+      table: FormatTable,
+      overwriteDynamic: Option[Boolean],
+      overwritePartitions: Option[Map[String, String]],
+      writeSchema: StructType): BatchWrite =
+    new FormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, 
writeSchema)
+
   override def createCTERelationRef(
       cteId: Long,
       resolved: Boolean,

Reply via email to