This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4021e5bc96 [spark] Add spark v2 write (#5531)
4021e5bc96 is described below

commit 4021e5bc96f30e7bfb07eb27d1a6440baa47d91c
Author: Yujiang Zhong <[email protected]>
AuthorDate: Sat May 3 21:22:48 2025 +0800

    [spark] Add spark v2 write (#5531)
---
 .../analysis/expressions/ExpressionHelper.scala    |  37 ++++
 .../apache/paimon/spark/SparkConnectorOptions.java |   7 +
 .../scala/org/apache/paimon/spark/SparkTable.scala |  45 ++++-
 .../analysis/expressions/ExpressionHelper.scala    |  39 ++++-
 .../paimon/spark/commands/PaimonCommand.scala      |  38 +---
 .../org/apache/paimon/spark/util/OptionUtils.scala |   4 +
 .../BaseWriteBuilder.scala}                        |  33 +---
 .../apache/paimon/spark/write/SparkV2Write.scala   | 193 +++++++++++++++++++++
 .../paimon/spark/write/SparkV2WriteBuilder.scala   |  69 ++++++++
 .../paimon/spark/{ => write}/SparkWrite.scala      |   3 +-
 .../SparkWriteBuilder.scala}                       |  36 ++--
 .../paimon/spark/write/SparkWriteRequirement.scala |  67 +++++++
 .../catalog/functions/BucketFunctionTest.java      |  36 ++--
 .../apache/paimon/spark/sql/SparkWriteITCase.scala |   6 +
 .../spark/sql/V2WriteRequireDistributionTest.scala | 145 ++++++++++++++++
 15 files changed, 664 insertions(+), 94 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 56223c36cd..f4890f84bd 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -20,10 +20,12 @@ package 
org.apache.paimon.spark.catalyst.analysis.expressions
 
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
 import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.spark.write.SparkWriteBuilder
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilter}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd, 
EqualNullSafe, EqualTo, Filter => SourceFilter}
 
 trait ExpressionHelper extends ExpressionHelperBase {
 
@@ -52,4 +54,39 @@ trait ExpressionHelper extends ExpressionHelperBase {
       Some(PredicateBuilder.and(predicates: _*))
     }
   }
+
+  /**
+   * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
+   * methods where the `AlwaysTrue` Filter is used.
+   */
+  def isTruncate(filter: SourceFilter): Boolean = {
+    val filters = splitConjunctiveFilters(filter)
+    filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
+  }
+
+  /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
+  def convertPartitionFilterToMap(
+      filter: SourceFilter,
+      partitionRowType: RowType): Map[String, String] = {
+    // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
+    val converter = new SparkFilterConverter(partitionRowType)
+    splitConjunctiveFilters(filter).map {
+      case EqualNullSafe(attribute, value) =>
+        (attribute, converter.convertString(attribute, value))
+      case EqualTo(attribute, value) =>
+        (attribute, converter.convertString(attribute, value))
+      case _ =>
+        // Should not happen
+        throw new RuntimeException(
+          s"Only support Overwrite filters with Equal and EqualNullSafe, but 
got: $filter")
+    }.toMap
+  }
+
+  private def splitConjunctiveFilters(filter: SourceFilter): Seq[SourceFilter] 
= {
+    filter match {
+      case SourceAnd(filter1, filter2) =>
+        splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
+      case other => other :: Nil
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 539f5a11e3..f2c0877bb4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -46,6 +46,13 @@ public class SparkConnectorOptions {
                     .withDescription(
                             "If true, allow to merge data types if the two 
types meet the rules for explicit casting.");
 
+    public static final ConfigOption<Boolean> USE_V2_WRITE =
+            key("write.use-v2-write")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, v2 write will be used. Currently, only 
HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 
write for other bucket modes. Currently, Spark V2 write does not support 
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.");
+
     public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
             key("read.stream.maxFilesPerTrigger")
                     .intType()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index b9a90d8b5b..bdc0d2cc29 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -20,8 +20,11 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalog.functions.BucketFunction
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.{DataTable, FileStoreTable, KnownSplitsTable, 
Table}
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.spark.write.{SparkV2WriteBuilder, SparkWriteBuilder}
+import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable, 
KnownSplitsTable, Table}
 import org.apache.paimon.utils.StringUtils
 
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, 
TableCatalog}
@@ -43,6 +46,26 @@ case class SparkTable(table: Table)
   with SupportsMetadataColumns
   with PaimonPartitionManagement {
 
+  private lazy val useV2Write: Boolean = {
+    val v2WriteConfigured = OptionUtils.useV2Write
+    v2WriteConfigured && supportsV2Write
+  }
+
+  private def supportsV2Write: Boolean = {
+    table match {
+      case storeTable: FileStoreTable =>
+        storeTable.bucketMode() match {
+          case BucketMode.HASH_FIXED =>
+            storeTable.coreOptions().bucket() > 0 && 
BucketFunction.supportsTable(storeTable)
+          case BucketMode.BUCKET_UNAWARE =>
+            storeTable.coreOptions().bucket() == BucketMode.UNAWARE_BUCKET
+          case _ => false
+        }
+
+      case _ => false
+    }
+  }
+
   def getTable: Table = table
 
   override def name: String = table.fullName
@@ -73,14 +96,21 @@ case class SparkTable(table: Table)
   }
 
   override def capabilities: JSet[TableCapability] = {
-    JEnumSet.of(
-      TableCapability.ACCEPT_ANY_SCHEMA,
+    val capabilities = JEnumSet.of(
       TableCapability.BATCH_READ,
-      TableCapability.V1_BATCH_WRITE,
       TableCapability.OVERWRITE_BY_FILTER,
       TableCapability.OVERWRITE_DYNAMIC,
       TableCapability.MICRO_BATCH_READ
     )
+
+    if (useV2Write) {
+      capabilities.add(TableCapability.BATCH_WRITE)
+    } else {
+      capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
+      capabilities.add(TableCapability.V1_BATCH_WRITE)
+    }
+
+    capabilities
   }
 
   override def metadataColumns: Array[MetadataColumn] = {
@@ -105,7 +135,12 @@ case class SparkTable(table: Table)
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
     table match {
       case fileStoreTable: FileStoreTable =>
-        new SparkWriteBuilder(fileStoreTable, Options.fromMap(info.options))
+        val options = Options.fromMap(info.options)
+        if (useV2Write) {
+          new SparkV2WriteBuilder(fileStoreTable, info.schema())
+        } else {
+          new SparkWriteBuilder(fileStoreTable, options)
+        }
       case _ =>
         throw new RuntimeException("Only FileStoreTable can be written.")
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index fcece0a262..2cb739ee6d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -19,8 +19,9 @@
 package org.apache.paimon.spark.catalyst.analysis.expressions
 
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
-import org.apache.paimon.spark.SparkV2FilterConverter
+import org.apache.paimon.spark.{SparkFilterConverter, SparkV2FilterConverter}
 import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.spark.write.SparkWriteBuilder
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.{Column, SparkSession}
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, And, 
Attribute, Cast, E
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd, 
EqualNullSafe, EqualTo, Filter => SourceFilter}
 import org.apache.spark.sql.types.{DataType, NullType}
 
 /** An expression helper. */
@@ -64,6 +66,41 @@ trait ExpressionHelper extends ExpressionHelperBase {
       Some(PredicateBuilder.and(predicates: _*))
     }
   }
+
+  /**
+   * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
+   * methods where the `AlwaysTrue` Filter is used.
+   */
+  def isTruncate(filter: SourceFilter): Boolean = {
+    val filters = splitConjunctiveFilters(filter)
+    filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
+  }
+
+  /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
+  def convertPartitionFilterToMap(
+      filter: SourceFilter,
+      partitionRowType: RowType): Map[String, String] = {
+    // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
+    val converter = new SparkFilterConverter(partitionRowType)
+    splitConjunctiveFilters(filter).map {
+      case EqualNullSafe(attribute, value) =>
+        (attribute, converter.convertString(attribute, value))
+      case EqualTo(attribute, value) =>
+        (attribute, converter.convertString(attribute, value))
+      case _ =>
+        // Should not happen
+        throw new RuntimeException(
+          s"Only support Overwrite filters with Equal and EqualNullSafe, but 
got: $filter")
+    }.toMap
+  }
+
+  private def splitConjunctiveFilters(filter: SourceFilter): Seq[SourceFilter] 
= {
+    filter match {
+      case SourceAnd(filter1, filter2) =>
+        splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
+      case other => other :: Nil
+    }
+  }
 }
 
 trait ExpressionHelperBase extends PredicateHelper {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 0b466c8efd..907d8be85c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -23,7 +23,7 @@ import 
org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, BitmapDeletion
 import org.apache.paimon.fs.Path
 import org.apache.paimon.index.IndexFileMeta
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
-import org.apache.paimon.spark.{SparkFilterConverter, SparkTable}
+import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
@@ -31,7 +31,6 @@ import org.apache.paimon.spark.schema.PaimonMetadataColumn._
 import org.apache.paimon.table.{BucketMode, FileStoreTable, KnownSplitsTable}
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
-import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.SerializationUtils
 
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -69,41 +68,6 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
 
   }
 
-  /**
-   * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
-   * methods where the `AlwaysTrue` Filter is used.
-   */
-  def isTruncate(filter: Filter): Boolean = {
-    val filters = splitConjunctiveFilters(filter)
-    filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
-  }
-
-  /** See [[ org.apache.paimon.spark.SparkWriteBuilder#failIfCanNotOverwrite]] 
*/
-  def convertPartitionFilterToMap(
-      filter: Filter,
-      partitionRowType: RowType): Map[String, String] = {
-    // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
-    val converter = new SparkFilterConverter(partitionRowType)
-    splitConjunctiveFilters(filter).map {
-      case EqualNullSafe(attribute, value) =>
-        (attribute, converter.convertString(attribute, value))
-      case EqualTo(attribute, value) =>
-        (attribute, converter.convertString(attribute, value))
-      case _ =>
-        // Should not happen
-        throw new RuntimeException(
-          s"Only support Overwrite filters with Equal and EqualNullSafe, but 
got: $filter")
-    }.toMap
-  }
-
-  private def splitConjunctiveFilters(filter: Filter): Seq[Filter] = {
-    filter match {
-      case And(filter1, filter2) =>
-        splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
-      case other => other :: Nil
-    }
-  }
-
   /** Gets a relative path against the table path. */
   protected def relativePath(absolutePath: String): String = {
     val location = table.location().toUri
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 7315730faf..ce66bb838d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -59,6 +59,10 @@ object OptionUtils extends SQLConfHelper {
     }
   }
 
+  def useV2Write(): Boolean = {
+    getOptionString(SparkConnectorOptions.USE_V2_WRITE).toBoolean
+  }
+
   def extractCatalogName(): Option[String] = {
     val sparkCatalogTemplate = String.format("%s([^.]*)$", 
SPARK_CATALOG_PREFIX)
     val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseWriteBuilder.scala
similarity index 77%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseWriteBuilder.scala
index 74a474b8c3..663193257e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseWriteBuilder.scala
@@ -16,27 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.write
 
-import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
-import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, And, 
EqualNullSafe, EqualTo, Filter, Not, Or}
+import org.apache.spark.sql.connector.write.WriteBuilder
+import org.apache.spark.sql.sources._
 
 import scala.collection.JavaConverters._
 
-private class SparkWriteBuilder(table: FileStoreTable, options: Options)
+abstract class BaseWriteBuilder(table: FileStoreTable)
   extends WriteBuilder
-  with SupportsOverwrite
+  with ExpressionHelper
   with SQLConfHelper {
 
-  private var saveMode: SaveMode = InsertInto
-
-  override def build = new SparkWrite(table, saveMode, options)
-
-  private def failWithReason(filter: Filter): Unit = {
+  protected def failWithReason(filter: Filter): Unit = {
     throw new RuntimeException(
       s"Only support Overwrite filters with Equal and EqualNullSafe, but got: 
$filter")
   }
@@ -55,7 +51,7 @@ private class SparkWriteBuilder(table: FileStoreTable, 
options: Options)
 
   // `SupportsOverwrite#canOverwrite` is added since Spark 3.4.0.
   // We do this checking by self to work with previous Spark version.
-  private def failIfCanNotOverwrite(filters: Array[Filter]): Unit = {
+  protected def failIfCanNotOverwrite(filters: Array[Filter]): Unit = {
     // For now, we only support overwrite with two cases:
     // - overwrite with partition columns to be compatible with v1 insert 
overwrite
     //   See 
[[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveInsertInto#staticDeleteExpression]].
@@ -81,17 +77,4 @@ private class SparkWriteBuilder(table: FileStoreTable, 
options: Options)
     }
     filters.foreach(validateFilter)
   }
-
-  override def overwrite(filters: Array[Filter]): WriteBuilder = {
-    failIfCanNotOverwrite(filters)
-
-    val conjunctiveFilters = if (filters.nonEmpty) {
-      Some(filters.reduce((l, r) => And(l, r)))
-    } else {
-      None
-    }
-    this.saveMode = Overwrite(conjunctiveFilters)
-    this
-  }
-
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
new file mode 100644
index 0000000000..3f2018060e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessage, CommitMessageSerializer}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.distributions.Distribution
+import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.types.StructType
+
+import java.io.{IOException, UncheckedIOException}
+
+import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success, Try}
+
+class SparkV2Write(
+    storeTable: FileStoreTable,
+    overwriteDynamic: Boolean,
+    overwritePartitions: Option[Map[String, String]],
+    writeSchema: StructType
+) extends Write
+  with RequiresDistributionAndOrdering
+  with Logging {
+
+  assert(
+    !(overwriteDynamic && overwritePartitions.nonEmpty),
+    "Cannot overwrite dynamically and by filter both")
+
+  private val table =
+    storeTable.copy(
+      Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key -> 
overwriteDynamic.toString).asJava)
+
+  private val batchWriteBuilder = {
+    val builder = table.newBatchWriteBuilder()
+    overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
+    builder
+  }
+
+  private val writeRequirement = SparkWriteRequirement(table)
+
+  override def requiredDistribution(): Distribution = {
+    val distribution = writeRequirement.distribution
+    logInfo(s"Requesting $distribution as write distribution for table 
${table.name()}")
+    distribution
+  }
+
+  override def requiredOrdering(): Array[SortOrder] = {
+    val ordering = writeRequirement.ordering
+    logInfo(s"Requesting ${ordering.mkString(",")} as write ordering for table 
${table.name()}")
+    ordering
+  }
+
+  override def toBatch: BatchWrite = new PaimonBatchWrite
+
+  override def toString: String =
+    if (overwriteDynamic)
+      s"PaimonWrite(table=${table.fullName()}, overwriteDynamic=true)"
+    else
+      s"PaimonWrite(table=${table.fullName()}, 
overwritePartitions=$overwritePartitions)"
+
+  private class PaimonBatchWrite extends BatchWrite {
+    override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
+      WriterFactory(writeSchema, batchWriteBuilder)
+
+    override def useCommitCoordinator(): Boolean = false
+
+    override def commit(messages: Array[WriterCommitMessage]): Unit = {
+      logInfo(s"Committing to table ${table.name()}")
+      val batchTableCommit = batchWriteBuilder.newCommit()
+
+      val commitMessages = messages.collect {
+        case taskCommit: TaskCommit => taskCommit.commitMessages
+        case other =>
+          throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
+      }.flatten
+
+      try {
+        val start = System.currentTimeMillis()
+        batchTableCommit.commit(commitMessages.toList.asJava)
+        logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+      } finally {
+        batchTableCommit.close()
+      }
+    }
+
+    override def abort(messages: Array[WriterCommitMessage]): Unit = {
+      // TODO clean uncommitted files
+    }
+  }
+}
+
+private case class WriterFactory(writeSchema: StructType, batchWriteBuilder: 
BatchWriteBuilder)
+  extends DataWriterFactory {
+
+  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
+    val batchTableWrite = batchWriteBuilder.newWrite()
+    new PaimonDataWriter(batchTableWrite, writeSchema)
+  }
+}
+
+private class PaimonDataWriter(batchTableWrite: BatchTableWrite, writeSchema: 
StructType)
+  extends DataWriter[InternalRow] {
+
+  private val ioManager = SparkUtils.createIOManager()
+  batchTableWrite.withIOManager(ioManager)
+
+  private val rowConverter: InternalRow => SparkInternalRowWrapper = {
+    val numFields = writeSchema.fields.length
+    val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema, 
numFields)
+    record => reusableWrapper.replace(record)
+  }
+
+  override def write(record: InternalRow): Unit = {
+    batchTableWrite.write(rowConverter.apply(record))
+  }
+
+  override def commit(): WriterCommitMessage = {
+    try {
+      val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq
+      TaskCommit(commitMessages)
+    } finally {
+      close()
+    }
+  }
+
+  override def abort(): Unit = close()
+
+  override def close(): Unit = {
+    try {
+      batchTableWrite.close()
+      ioManager.close()
+    } catch {
+      case e: Exception => throw new RuntimeException(e)
+    }
+  }
+}
+
+class TaskCommit private (
+    private val serializedMessageBytes: Seq[Array[Byte]]
+) extends WriterCommitMessage {
+  def commitMessages(): Seq[CommitMessage] = {
+    val deserializer = new CommitMessageSerializer()
+    serializedMessageBytes.map {
+      bytes =>
+        Try(deserializer.deserialize(deserializer.getVersion, bytes)) match {
+          case Success(msg) => msg
+          case Failure(e: IOException) => throw new UncheckedIOException(e)
+          case Failure(e) => throw e
+        }
+    }
+  }
+}
+
+object TaskCommit {
+  def apply(commitMessages: Seq[CommitMessage]): TaskCommit = {
+    val serializer = new CommitMessageSerializer()
+    val serializedBytes: Seq[Array[Byte]] = Option(commitMessages)
+      .filter(_.nonEmpty)
+      .map(_.map {
+        msg =>
+          Try(serializer.serialize(msg)) match {
+            case Success(serializedBytes) => serializedBytes
+            case Failure(e: IOException) => throw new UncheckedIOException(e)
+            case Failure(e) => throw e
+          }
+      })
+      .getOrElse(Seq.empty)
+
+    new TaskCommit(serializedBytes)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
new file mode 100644
index 0000000000..a8b9a1c211
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, 
SupportsOverwrite, WriteBuilder}
+import org.apache.spark.sql.sources.{And, Filter}
+import org.apache.spark.sql.types.StructType
+
+class SparkV2WriteBuilder(table: FileStoreTable, writeSchema: StructType)
+  extends BaseWriteBuilder(table)
+  with SupportsOverwrite
+  with SupportsDynamicOverwrite {
+
+  private var overwriteDynamic = false
+  private var overwritePartitions: Map[String, String] = null
+
+  override def build =
+    new SparkV2Write(table, overwriteDynamic, 
Option.apply(overwritePartitions), writeSchema)
+
+  override def overwrite(filters: Array[Filter]): WriteBuilder = {
+    if (overwriteDynamic) {
+      throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
+    }
+
+    failIfCanNotOverwrite(filters)
+
+    val conjunctiveFilters = if (filters.nonEmpty) {
+      Some(filters.reduce((l, r) => And(l, r)))
+    } else {
+      None
+    }
+
+    if (isTruncate(conjunctiveFilters.get)) {
+      overwritePartitions = Map.empty[String, String]
+    } else {
+      overwritePartitions =
+        convertPartitionFilterToMap(conjunctiveFilters.get, 
table.schema.logicalPartitionType())
+    }
+
+    this
+  }
+
+  override def overwriteDynamicPartitions(): WriteBuilder = {
+    if (overwritePartitions != null) {
+      throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
+    }
+
+    overwriteDynamic = true
+    this
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
similarity index 95%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
index fd43decfc6..d8c8dc942c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.write
 
 import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SaveMode
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.table.FileStoreTable
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
similarity index 54%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
index fd43decfc6..bfb76552fb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
@@ -16,27 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.write
 
 import org.apache.paimon.options.Options
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.spark.{InsertInto, Overwrite, SaveMode}
 import org.apache.paimon.table.FileStoreTable
 
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.connector.write.V1Write
-import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
+import org.apache.spark.sql.sources._
 
-/** Spark [[V1Write]], it is required to use v1 write for grouping by bucket. 
*/
-class SparkWrite(val table: FileStoreTable, saveMode: SaveMode, options: 
Options) extends V1Write {
+class SparkWriteBuilder(table: FileStoreTable, options: Options)
+  extends BaseWriteBuilder(table)
+  with SupportsOverwrite {
 
-  override def toInsertableRelation: InsertableRelation = {
-    (data: DataFrame, overwrite: Boolean) =>
-      {
-        WriteIntoPaimonTable(table, saveMode, data, 
options).run(data.sparkSession)
-      }
-  }
+  private var saveMode: SaveMode = InsertInto
+
+  override def build = new SparkWrite(table, saveMode, options)
 
-  override def toString: String = {
-    s"table: ${table.fullName()}, saveMode: $saveMode, options: 
${options.toMap}"
+  override def overwrite(filters: Array[Filter]): WriteBuilder = {
+    failIfCanNotOverwrite(filters)
+
+    val conjunctiveFilters = if (filters.nonEmpty) {
+      Some(filters.reduce((l, r) => And(l, r)))
+    } else {
+      None
+    }
+    this.saveMode = Overwrite(conjunctiveFilters)
+    this
   }
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
new file mode 100644
index 0000000000..b55d5ca85a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
+
+import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, 
Distribution, Distributions}
+import org.apache.spark.sql.connector.expressions.{Expression, Expressions, 
SortOrder}
+
+import scala.collection.JavaConverters._
+
+/** Distribution requirements of Spark write. */
+case class SparkWriteRequirement(distribution: Distribution, ordering: 
Array[SortOrder])
+
+object SparkWriteRequirement {
+  private val EMPTY_ORDERING: Array[SortOrder] = Array.empty
+  private val EMPTY: SparkWriteRequirement =
+    SparkWriteRequirement(Distributions.unspecified(), EMPTY_ORDERING)
+
+  def apply(table: FileStoreTable): SparkWriteRequirement = {
+    val bucketSpec = table.bucketSpec()
+    val bucketTransforms = bucketSpec.getBucketMode match {
+      case BucketMode.HASH_FIXED =>
+        Seq(
+          Expressions.bucket(
+            bucketSpec.getNumBuckets,
+            bucketSpec.getBucketKeys.asScala.map(quote).toArray: _*))
+      case BucketMode.BUCKET_UNAWARE =>
+        Seq.empty
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Unsupported bucket mode ${bucketSpec.getBucketMode}")
+    }
+
+    val partitionTransforms =
+      table.schema().partitionKeys().asScala.map(key => 
Expressions.identity(quote(key)))
+    val clusteringExpressions =
+      (partitionTransforms ++ 
bucketTransforms).map(identity[Expression]).toArray
+
+    if (clusteringExpressions.isEmpty) {
+      EMPTY
+    } else {
+      val distribution: ClusteredDistribution =
+        Distributions.clustered(clusteringExpressions)
+      SparkWriteRequirement(distribution, EMPTY_ORDERING)
+    }
+  }
+
+  private def quote(columnName: String): String =
+    s"`${columnName.replace("`", "``")}`"
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
index 14bc67ab1e..cf02654063 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
@@ -66,6 +66,7 @@ import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -96,6 +97,8 @@ public class BucketFunctionTest {
     private static final int COMPACTED_DECIMAL_SCALE = 9;
     private static final int TIMESTAMP_PRECISION = 6;
 
+    private static final String TIMESTAMP_COL_PRECISION_3 = 
"timestamp_col_precision_3";
+
     private static final RowType ROW_TYPE =
             new RowType(
                     Arrays.asList(
@@ -124,12 +127,13 @@ public class BucketFunctionTest {
                                     new 
LocalZonedTimestampType(TIMESTAMP_PRECISION)),
                             new DataField(
                                     12, BINARY_COL, new 
VarBinaryType(VarBinaryType.MAX_LENGTH)),
-                            new DataField(13, ID_COL, new IntType())));
+                            new DataField(13, ID_COL, new IntType()),
+                            new DataField(14, TIMESTAMP_COL_PRECISION_3, new 
TimestampType(3))));
 
     private static final InternalRow NULL_PAIMON_ROW =
             GenericRow.of(
-                    null, null, null, null, null, null, null, null, null, 
null, null, null, null,
-                    0);
+                    null, null, null, null, null, null, null, null, null, 
null, null, null, null, 0,
+                    null);
 
     private static InternalRow randomPaimonInternalRow() {
         Random random = new Random();
@@ -152,7 +156,8 @@ public class BucketFunctionTest {
                 Timestamp.fromMicros(System.nanoTime() / 1000),
                 Timestamp.fromMicros(System.nanoTime() / 1000),
                 UUID.randomUUID().toString().getBytes(),
-                1);
+                1,
+                Timestamp.fromEpochMillis(System.currentTimeMillis()));
     }
 
     private static final String TABLE_NAME = "test_bucket";
@@ -199,7 +204,9 @@ public class BucketFunctionTest {
                                             CoreOptions.BUCKET_KEY.key(),
                                             String.join(",", bucketColumns),
                                             CoreOptions.BUCKET.key(),
-                                            String.valueOf(NUM_BUCKETS)),
+                                            String.valueOf(NUM_BUCKETS),
+                                            "file.format",
+                                            "avro"),
                                     ""));
             FileStoreTable storeTable =
                     FileStoreTableFactory.create(LocalFileIO.create(), 
tablePath, tableSchema);
@@ -220,11 +227,6 @@ public class BucketFunctionTest {
         }
     }
 
-    @Test
-    public void test() {
-        validateSparkBucketFunction(INTEGER_COL);
-    }
-
     private static void validateSparkBucketFunction(String... bucketColumns) {
         setupTable(bucketColumns);
         spark.sql(
@@ -307,10 +309,24 @@ public class BucketFunctionTest {
         while (bucketColumns == null || bucketColumns.length < 2) {
             bucketColumns =
                     allColumns.stream()
+                            .filter(e -> 
!Objects.equals(TIMESTAMP_COL_PRECISION_3, e))
                             .filter(e -> 
ThreadLocalRandom.current().nextBoolean())
                             .toArray(String[]::new);
         }
 
         validateSparkBucketFunction(bucketColumns);
     }
+
+    @Test
+    public void testTimestampPrecisionNotEqualToSpark() {
+        setupTable(TIMESTAMP_COL_PRECISION_3);
+        spark.sql(
+                        String.format(
+                                "SELECT id_col, __paimon_bucket as 
expected_bucket, paimon.bucket(%s, %s) FROM %s",
+                                NUM_BUCKETS,
+                                String.join(",", TIMESTAMP_COL_PRECISION_3),
+                                TABLE_NAME))
+                .collectAsList()
+                .forEach(row -> 
Assertions.assertThat(row.getInt(2)).isNotEqualTo(row.get(1)));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index 8bc640bf37..a84fb8d4c8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -40,6 +40,12 @@ class SparkWriteWithNoExtensionITCase extends 
SparkWriteITCase {
   }
 }
 
+class SparkV2WriteITCase extends SparkWriteITCase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
+
 class SparkWriteITCase extends PaimonSparkTestBase {
 
   test("Paimon Write : Postpone Bucket") {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
new file mode 100644
index 0000000000..02a5b9a830
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
AppendDataExecV1}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+
+class V2WriteRequireDistributionTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelper {
+
+  test("V2 write primary key table") {
+
+    withTable("t1") {
+      spark.sql(
+        "CREATE TABLE t1 (type STRING, id INT, data STRING) partitioned by 
(type) TBLPROPERTIES ('primary-key' = 'type,id', 'bucket'='2', 
'file.format'='avro')")
+      val query =
+        "INSERT INTO t1 VALUES ('foo', 1, 'x1'), ('foo',1, 'X1'), ('foo', 2, 
'x2'), ('foo', 2, 'X2'), ('bar', 3, 'x3'), ('bar', 3, 'X3')"
+
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+        val df = spark.sql(query)
+        val nodes = collect(
+          
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
 {
+          case shuffle: ShuffleExchangeLike => shuffle
+          case append: AppendDataExec => append
+        }
+
+        assert(nodes.size == 2)
+
+        val node1 = nodes(0)
+        assert(
+          node1.isInstanceOf[AppendDataExec] &&
+            node1.toString.contains("PaimonWrite(table=test.t1"),
+          s"Expected AppendDataExec with specific paimon write, but got: 
$node1"
+        )
+
+        val node2 = nodes(1)
+        assert(
+          node2.isInstanceOf[ShuffleExchangeLike] &&
+            
node2.toString.contains("org.apache.paimon.spark.catalog.functions.BucketFunction"),
+          s"Expected ShuffleExchangeLike with BucketFunction, but got: $node2"
+        )
+
+        checkAnswer(
+          spark.sql("SELECT * FROM t1"),
+          Seq(
+            Row("foo", 1, "X1"),
+            Row("foo", 2, "X2"),
+            Row("bar", 3, "X3")
+          ))
+      }
+    }
+  }
+
+  test("V2 write append only table") {
+
+    withTable("t1") {
+      spark.sql(
+        "CREATE TABLE t1 (type STRING, id INT, data STRING) partitioned by 
(type) TBLPROPERTIES ('bucket-key' = 'id', 'bucket'='2', 'file.format'='avro')")
+      val query =
+        "INSERT INTO t1 VALUES ('foo', 1, 'x1'), ('foo',1, 'X1'), ('foo', 2, 
'x2'), ('foo', 2, 'X2'), ('bar', 3, 'x3'), ('bar', 3, 'X3')"
+
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+        val df = spark.sql(query)
+        val nodes = collect(
+          
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
 {
+          case shuffle: ShuffleExchangeLike => shuffle
+          case append: AppendDataExec => append
+        }
+
+        assert(nodes.size == 2)
+
+        val node1 = nodes(0)
+        assert(
+          node1.isInstanceOf[AppendDataExec] &&
+            node1.toString.contains("PaimonWrite(table=test.t1"),
+          s"Expected AppendDataExec with specific paimon write, but got: 
$node1"
+        )
+
+        val node2 = nodes(1)
+        assert(
+          node2.isInstanceOf[ShuffleExchangeLike] &&
+            
node2.toString.contains("org.apache.paimon.spark.catalog.functions.BucketFunction"),
+          s"Expected ShuffleExchangeLike with BucketFunction, but got: $node2"
+        )
+      }
+
+      checkAnswer(
+        spark.sql("SELECT * FROM t1"),
+        Seq(
+          Row("foo", 1, "x1"),
+          Row("foo", 1, "X1"),
+          Row("foo", 2, "x2"),
+          Row("foo", 2, "X2"),
+          Row("bar", 3, "x3"),
+          Row("bar", 3, "X3")
+        ))
+    }
+  }
+
+  test("Fallback to v1 write") {
+
+    withTable("t1") {
+      spark.sql(
+        "CREATE TABLE t1 (id INT, data STRING) partitioned by(data) 
TBLPROPERTIES ('primary-key' = 'id', 'bucket'='-1')")
+
+      val query = "INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 
'x4'), (5, 'x5')"
+
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+        val df = spark.sql(query)
+        val nodes = collect(
+          
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
 {
+          case append: AppendDataExecV1 => append
+        }
+
+        assert(nodes.size == 1)
+        val node1 = nodes(0)
+        assert(
+          node1.isInstanceOf[AppendDataExecV1] &&
+            node1.toString.contains("AppendDataExecV1 
PrimaryKeyFileStoreTable[test.t1]"),
+          s"Expected AppendDataExec with specific paimon write, but got: 
$node1"
+        )
+      }
+    }
+  }
+}


Reply via email to