This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4021e5bc96 [spark] Add spark v2 write (#5531)
4021e5bc96 is described below
commit 4021e5bc96f30e7bfb07eb27d1a6440baa47d91c
Author: Yujiang Zhong <[email protected]>
AuthorDate: Sat May 3 21:22:48 2025 +0800
[spark] Add spark v2 write (#5531)
---
.../analysis/expressions/ExpressionHelper.scala | 37 ++++
.../apache/paimon/spark/SparkConnectorOptions.java | 7 +
.../scala/org/apache/paimon/spark/SparkTable.scala | 45 ++++-
.../analysis/expressions/ExpressionHelper.scala | 39 ++++-
.../paimon/spark/commands/PaimonCommand.scala | 38 +---
.../org/apache/paimon/spark/util/OptionUtils.scala | 4 +
.../BaseWriteBuilder.scala} | 33 +---
.../apache/paimon/spark/write/SparkV2Write.scala | 193 +++++++++++++++++++++
.../paimon/spark/write/SparkV2WriteBuilder.scala | 69 ++++++++
.../paimon/spark/{ => write}/SparkWrite.scala | 3 +-
.../SparkWriteBuilder.scala} | 36 ++--
.../paimon/spark/write/SparkWriteRequirement.scala | 67 +++++++
.../catalog/functions/BucketFunctionTest.java | 36 ++--
.../apache/paimon/spark/sql/SparkWriteITCase.scala | 6 +
.../spark/sql/V2WriteRequireDistributionTest.scala | 145 ++++++++++++++++
15 files changed, 664 insertions(+), 94 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 56223c36cd..f4890f84bd 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -20,10 +20,12 @@ package
org.apache.paimon.spark.catalyst.analysis.expressions
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.spark.write.SparkWriteBuilder
import org.apache.paimon.types.RowType
import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd,
EqualNullSafe, EqualTo, Filter => SourceFilter}
trait ExpressionHelper extends ExpressionHelperBase {
@@ -52,4 +54,39 @@ trait ExpressionHelper extends ExpressionHelperBase {
Some(PredicateBuilder.and(predicates: _*))
}
}
+
+ /**
+ * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
+ * methods where the `AlwaysTrue` Filter is used.
+ */
+ def isTruncate(filter: SourceFilter): Boolean = {
+ val filters = splitConjunctiveFilters(filter)
+ filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
+ }
+
+ /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
+ def convertPartitionFilterToMap(
+ filter: SourceFilter,
+ partitionRowType: RowType): Map[String, String] = {
+ // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
+ val converter = new SparkFilterConverter(partitionRowType)
+ splitConjunctiveFilters(filter).map {
+ case EqualNullSafe(attribute, value) =>
+ (attribute, converter.convertString(attribute, value))
+ case EqualTo(attribute, value) =>
+ (attribute, converter.convertString(attribute, value))
+ case _ =>
+ // Should not happen
+ throw new RuntimeException(
+ s"Only support Overwrite filters with Equal and EqualNullSafe, but
got: $filter")
+ }.toMap
+ }
+
+ private def splitConjunctiveFilters(filter: SourceFilter): Seq[SourceFilter]
= {
+ filter match {
+ case SourceAnd(filter1, filter2) =>
+ splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
+ case other => other :: Nil
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 539f5a11e3..f2c0877bb4 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -46,6 +46,13 @@ public class SparkConnectorOptions {
.withDescription(
"If true, allow to merge data types if the two
types meet the rules for explicit casting.");
+ public static final ConfigOption<Boolean> USE_V2_WRITE =
+ key("write.use-v2-write")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, v2 write will be used. Currently, only
HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1
write for other bucket modes. Currently, Spark V2 write does not support
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.");
+
public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
key("read.stream.maxFilesPerTrigger")
.intType()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index b9a90d8b5b..bdc0d2cc29 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -20,8 +20,11 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalog.functions.BucketFunction
import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.{DataTable, FileStoreTable, KnownSplitsTable,
Table}
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.spark.write.{SparkV2WriteBuilder, SparkWriteBuilder}
+import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable,
KnownSplitsTable, Table}
import org.apache.paimon.utils.StringUtils
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability,
TableCatalog}
@@ -43,6 +46,26 @@ case class SparkTable(table: Table)
with SupportsMetadataColumns
with PaimonPartitionManagement {
+ private lazy val useV2Write: Boolean = {
+ val v2WriteConfigured = OptionUtils.useV2Write
+ v2WriteConfigured && supportsV2Write
+ }
+
+ private def supportsV2Write: Boolean = {
+ table match {
+ case storeTable: FileStoreTable =>
+ storeTable.bucketMode() match {
+ case BucketMode.HASH_FIXED =>
+ storeTable.coreOptions().bucket() > 0 &&
BucketFunction.supportsTable(storeTable)
+ case BucketMode.BUCKET_UNAWARE =>
+ storeTable.coreOptions().bucket() == BucketMode.UNAWARE_BUCKET
+ case _ => false
+ }
+
+ case _ => false
+ }
+ }
+
def getTable: Table = table
override def name: String = table.fullName
@@ -73,14 +96,21 @@ case class SparkTable(table: Table)
}
override def capabilities: JSet[TableCapability] = {
- JEnumSet.of(
- TableCapability.ACCEPT_ANY_SCHEMA,
+ val capabilities = JEnumSet.of(
TableCapability.BATCH_READ,
- TableCapability.V1_BATCH_WRITE,
TableCapability.OVERWRITE_BY_FILTER,
TableCapability.OVERWRITE_DYNAMIC,
TableCapability.MICRO_BATCH_READ
)
+
+ if (useV2Write) {
+ capabilities.add(TableCapability.BATCH_WRITE)
+ } else {
+ capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
+ capabilities.add(TableCapability.V1_BATCH_WRITE)
+ }
+
+ capabilities
}
override def metadataColumns: Array[MetadataColumn] = {
@@ -105,7 +135,12 @@ case class SparkTable(table: Table)
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
table match {
case fileStoreTable: FileStoreTable =>
- new SparkWriteBuilder(fileStoreTable, Options.fromMap(info.options))
+ val options = Options.fromMap(info.options)
+ if (useV2Write) {
+ new SparkV2WriteBuilder(fileStoreTable, info.schema())
+ } else {
+ new SparkWriteBuilder(fileStoreTable, options)
+ }
case _ =>
throw new RuntimeException("Only FileStoreTable can be written.")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index fcece0a262..2cb739ee6d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -19,8 +19,9 @@
package org.apache.paimon.spark.catalyst.analysis.expressions
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
-import org.apache.paimon.spark.SparkV2FilterConverter
+import org.apache.paimon.spark.{SparkFilterConverter, SparkV2FilterConverter}
import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.spark.write.SparkWriteBuilder
import org.apache.paimon.types.RowType
import org.apache.spark.sql.{Column, SparkSession}
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, And,
Attribute, Cast, E
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd,
EqualNullSafe, EqualTo, Filter => SourceFilter}
import org.apache.spark.sql.types.{DataType, NullType}
/** An expression helper. */
@@ -64,6 +66,41 @@ trait ExpressionHelper extends ExpressionHelperBase {
Some(PredicateBuilder.and(predicates: _*))
}
}
+
+ /**
+ * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
+ * methods where the `AlwaysTrue` Filter is used.
+ */
+ def isTruncate(filter: SourceFilter): Boolean = {
+ val filters = splitConjunctiveFilters(filter)
+ filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
+ }
+
+ /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
+ def convertPartitionFilterToMap(
+ filter: SourceFilter,
+ partitionRowType: RowType): Map[String, String] = {
+ // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
+ val converter = new SparkFilterConverter(partitionRowType)
+ splitConjunctiveFilters(filter).map {
+ case EqualNullSafe(attribute, value) =>
+ (attribute, converter.convertString(attribute, value))
+ case EqualTo(attribute, value) =>
+ (attribute, converter.convertString(attribute, value))
+ case _ =>
+ // Should not happen
+ throw new RuntimeException(
+ s"Only support Overwrite filters with Equal and EqualNullSafe, but
got: $filter")
+ }.toMap
+ }
+
+ private def splitConjunctiveFilters(filter: SourceFilter): Seq[SourceFilter]
= {
+ filter match {
+ case SourceAnd(filter1, filter2) =>
+ splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
+ case other => other :: Nil
+ }
+ }
}
trait ExpressionHelperBase extends PredicateHelper {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 0b466c8efd..907d8be85c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -23,7 +23,7 @@ import
org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, BitmapDeletion
import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
-import org.apache.paimon.spark.{SparkFilterConverter, SparkTable}
+import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.schema.PaimonMetadataColumn
@@ -31,7 +31,6 @@ import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.table.{BucketMode, FileStoreTable, KnownSplitsTable}
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
-import org.apache.paimon.types.RowType
import org.apache.paimon.utils.SerializationUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -69,41 +68,6 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
}
- /**
- * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
- * methods where the `AlwaysTrue` Filter is used.
- */
- def isTruncate(filter: Filter): Boolean = {
- val filters = splitConjunctiveFilters(filter)
- filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
- }
-
- /** See [[ org.apache.paimon.spark.SparkWriteBuilder#failIfCanNotOverwrite]]
*/
- def convertPartitionFilterToMap(
- filter: Filter,
- partitionRowType: RowType): Map[String, String] = {
- // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
- val converter = new SparkFilterConverter(partitionRowType)
- splitConjunctiveFilters(filter).map {
- case EqualNullSafe(attribute, value) =>
- (attribute, converter.convertString(attribute, value))
- case EqualTo(attribute, value) =>
- (attribute, converter.convertString(attribute, value))
- case _ =>
- // Should not happen
- throw new RuntimeException(
- s"Only support Overwrite filters with Equal and EqualNullSafe, but
got: $filter")
- }.toMap
- }
-
- private def splitConjunctiveFilters(filter: Filter): Seq[Filter] = {
- filter match {
- case And(filter1, filter2) =>
- splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
- case other => other :: Nil
- }
- }
-
/** Gets a relative path against the table path. */
protected def relativePath(absolutePath: String): String = {
val location = table.location().toUri
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 7315730faf..ce66bb838d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -59,6 +59,10 @@ object OptionUtils extends SQLConfHelper {
}
}
+ def useV2Write(): Boolean = {
+ getOptionString(SparkConnectorOptions.USE_V2_WRITE).toBoolean
+ }
+
def extractCatalogName(): Option[String] = {
val sparkCatalogTemplate = String.format("%s([^.]*)$",
SPARK_CATALOG_PREFIX)
val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseWriteBuilder.scala
similarity index 77%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseWriteBuilder.scala
index 74a474b8c3..663193257e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseWriteBuilder.scala
@@ -16,27 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.write
-import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
-import org.apache.spark.sql.sources.{AlwaysFalse, AlwaysTrue, And,
EqualNullSafe, EqualTo, Filter, Not, Or}
+import org.apache.spark.sql.connector.write.WriteBuilder
+import org.apache.spark.sql.sources._
import scala.collection.JavaConverters._
-private class SparkWriteBuilder(table: FileStoreTable, options: Options)
+abstract class BaseWriteBuilder(table: FileStoreTable)
extends WriteBuilder
- with SupportsOverwrite
+ with ExpressionHelper
with SQLConfHelper {
- private var saveMode: SaveMode = InsertInto
-
- override def build = new SparkWrite(table, saveMode, options)
-
- private def failWithReason(filter: Filter): Unit = {
+ protected def failWithReason(filter: Filter): Unit = {
throw new RuntimeException(
s"Only support Overwrite filters with Equal and EqualNullSafe, but got:
$filter")
}
@@ -55,7 +51,7 @@ private class SparkWriteBuilder(table: FileStoreTable,
options: Options)
// `SupportsOverwrite#canOverwrite` is added since Spark 3.4.0.
// We do this checking by self to work with previous Spark version.
- private def failIfCanNotOverwrite(filters: Array[Filter]): Unit = {
+ protected def failIfCanNotOverwrite(filters: Array[Filter]): Unit = {
// For now, we only support overwrite with two cases:
// - overwrite with partition columns to be compatible with v1 insert
overwrite
// See
[[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveInsertInto#staticDeleteExpression]].
@@ -81,17 +77,4 @@ private class SparkWriteBuilder(table: FileStoreTable,
options: Options)
}
filters.foreach(validateFilter)
}
-
- override def overwrite(filters: Array[Filter]): WriteBuilder = {
- failIfCanNotOverwrite(filters)
-
- val conjunctiveFilters = if (filters.nonEmpty) {
- Some(filters.reduce((l, r) => And(l, r)))
- } else {
- None
- }
- this.saveMode = Overwrite(conjunctiveFilters)
- this
- }
-
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
new file mode 100644
index 0000000000..3f2018060e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder,
CommitMessage, CommitMessageSerializer}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.distributions.Distribution
+import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.write._
+import org.apache.spark.sql.types.StructType
+
+import java.io.{IOException, UncheckedIOException}
+
+import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success, Try}
+
+class SparkV2Write(
+ storeTable: FileStoreTable,
+ overwriteDynamic: Boolean,
+ overwritePartitions: Option[Map[String, String]],
+ writeSchema: StructType
+) extends Write
+ with RequiresDistributionAndOrdering
+ with Logging {
+
+ assert(
+ !(overwriteDynamic && overwritePartitions.nonEmpty),
+ "Cannot overwrite dynamically and by filter both")
+
+ private val table =
+ storeTable.copy(
+ Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key ->
overwriteDynamic.toString).asJava)
+
+ private val batchWriteBuilder = {
+ val builder = table.newBatchWriteBuilder()
+ overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
+ builder
+ }
+
+ private val writeRequirement = SparkWriteRequirement(table)
+
+ override def requiredDistribution(): Distribution = {
+ val distribution = writeRequirement.distribution
+ logInfo(s"Requesting $distribution as write distribution for table
${table.name()}")
+ distribution
+ }
+
+ override def requiredOrdering(): Array[SortOrder] = {
+ val ordering = writeRequirement.ordering
+ logInfo(s"Requesting ${ordering.mkString(",")} as write ordering for table
${table.name()}")
+ ordering
+ }
+
+ override def toBatch: BatchWrite = new PaimonBatchWrite
+
+ override def toString: String =
+ if (overwriteDynamic)
+ s"PaimonWrite(table=${table.fullName()}, overwriteDynamic=true)"
+ else
+ s"PaimonWrite(table=${table.fullName()},
overwritePartitions=$overwritePartitions)"
+
+ private class PaimonBatchWrite extends BatchWrite {
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ WriterFactory(writeSchema, batchWriteBuilder)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to table ${table.name()}")
+ val batchTableCommit = batchWriteBuilder.newCommit()
+
+ val commitMessages = messages.collect {
+ case taskCommit: TaskCommit => taskCommit.commitMessages
+ case other =>
+ throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
+ }.flatten
+
+ try {
+ val start = System.currentTimeMillis()
+ batchTableCommit.commit(commitMessages.toList.asJava)
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ } finally {
+ batchTableCommit.close()
+ }
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ // TODO clean uncommitted files
+ }
+ }
+}
+
+private case class WriterFactory(writeSchema: StructType, batchWriteBuilder:
BatchWriteBuilder)
+ extends DataWriterFactory {
+
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
+ val batchTableWrite = batchWriteBuilder.newWrite()
+ new PaimonDataWriter(batchTableWrite, writeSchema)
+ }
+}
+
+private class PaimonDataWriter(batchTableWrite: BatchTableWrite, writeSchema:
StructType)
+ extends DataWriter[InternalRow] {
+
+ private val ioManager = SparkUtils.createIOManager()
+ batchTableWrite.withIOManager(ioManager)
+
+ private val rowConverter: InternalRow => SparkInternalRowWrapper = {
+ val numFields = writeSchema.fields.length
+ val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema,
numFields)
+ record => reusableWrapper.replace(record)
+ }
+
+ override def write(record: InternalRow): Unit = {
+ batchTableWrite.write(rowConverter.apply(record))
+ }
+
+ override def commit(): WriterCommitMessage = {
+ try {
+ val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq
+ TaskCommit(commitMessages)
+ } finally {
+ close()
+ }
+ }
+
+ override def abort(): Unit = close()
+
+ override def close(): Unit = {
+ try {
+ batchTableWrite.close()
+ ioManager.close()
+ } catch {
+ case e: Exception => throw new RuntimeException(e)
+ }
+ }
+}
+
+class TaskCommit private (
+ private val serializedMessageBytes: Seq[Array[Byte]]
+) extends WriterCommitMessage {
+ def commitMessages(): Seq[CommitMessage] = {
+ val deserializer = new CommitMessageSerializer()
+ serializedMessageBytes.map {
+ bytes =>
+ Try(deserializer.deserialize(deserializer.getVersion, bytes)) match {
+ case Success(msg) => msg
+ case Failure(e: IOException) => throw new UncheckedIOException(e)
+ case Failure(e) => throw e
+ }
+ }
+ }
+}
+
+object TaskCommit {
+ def apply(commitMessages: Seq[CommitMessage]): TaskCommit = {
+ val serializer = new CommitMessageSerializer()
+ val serializedBytes: Seq[Array[Byte]] = Option(commitMessages)
+ .filter(_.nonEmpty)
+ .map(_.map {
+ msg =>
+ Try(serializer.serialize(msg)) match {
+ case Success(serializedBytes) => serializedBytes
+ case Failure(e: IOException) => throw new UncheckedIOException(e)
+ case Failure(e) => throw e
+ }
+ })
+ .getOrElse(Seq.empty)
+
+ new TaskCommit(serializedBytes)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
new file mode 100644
index 0000000000..a8b9a1c211
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite,
SupportsOverwrite, WriteBuilder}
+import org.apache.spark.sql.sources.{And, Filter}
+import org.apache.spark.sql.types.StructType
+
+class SparkV2WriteBuilder(table: FileStoreTable, writeSchema: StructType)
+ extends BaseWriteBuilder(table)
+ with SupportsOverwrite
+ with SupportsDynamicOverwrite {
+
+ private var overwriteDynamic = false
+ private var overwritePartitions: Map[String, String] = null
+
+ override def build =
+ new SparkV2Write(table, overwriteDynamic,
Option.apply(overwritePartitions), writeSchema)
+
+ override def overwrite(filters: Array[Filter]): WriteBuilder = {
+ if (overwriteDynamic) {
+ throw new IllegalArgumentException("Cannot overwrite dynamically and by
filter both")
+ }
+
+ failIfCanNotOverwrite(filters)
+
+ val conjunctiveFilters = if (filters.nonEmpty) {
+ Some(filters.reduce((l, r) => And(l, r)))
+ } else {
+ None
+ }
+
+ if (isTruncate(conjunctiveFilters.get)) {
+ overwritePartitions = Map.empty[String, String]
+ } else {
+ overwritePartitions =
+ convertPartitionFilterToMap(conjunctiveFilters.get,
table.schema.logicalPartitionType())
+ }
+
+ this
+ }
+
+ override def overwriteDynamicPartitions(): WriteBuilder = {
+ if (overwritePartitions != null) {
+ throw new IllegalArgumentException("Cannot overwrite dynamically and by
filter both")
+ }
+
+ overwriteDynamic = true
+ this
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
similarity index 95%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
index fd43decfc6..d8c8dc942c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
@@ -16,9 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.write
import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SaveMode
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.table.FileStoreTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
similarity index 54%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
index fd43decfc6..bfb76552fb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
@@ -16,27 +16,33 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.write
import org.apache.paimon.options.Options
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.spark.{InsertInto, Overwrite, SaveMode}
import org.apache.paimon.table.FileStoreTable
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.connector.write.V1Write
-import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
+import org.apache.spark.sql.sources._
-/** Spark [[V1Write]], it is required to use v1 write for grouping by bucket.
*/
-class SparkWrite(val table: FileStoreTable, saveMode: SaveMode, options:
Options) extends V1Write {
+class SparkWriteBuilder(table: FileStoreTable, options: Options)
+ extends BaseWriteBuilder(table)
+ with SupportsOverwrite {
- override def toInsertableRelation: InsertableRelation = {
- (data: DataFrame, overwrite: Boolean) =>
- {
- WriteIntoPaimonTable(table, saveMode, data,
options).run(data.sparkSession)
- }
- }
+ private var saveMode: SaveMode = InsertInto
+
+ override def build = new SparkWrite(table, saveMode, options)
- override def toString: String = {
- s"table: ${table.fullName()}, saveMode: $saveMode, options:
${options.toMap}"
+ override def overwrite(filters: Array[Filter]): WriteBuilder = {
+ failIfCanNotOverwrite(filters)
+
+ val conjunctiveFilters = if (filters.nonEmpty) {
+ Some(filters.reduce((l, r) => And(l, r)))
+ } else {
+ None
+ }
+ this.saveMode = Overwrite(conjunctiveFilters)
+ this
}
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
new file mode 100644
index 0000000000..b55d5ca85a
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
+
+import org.apache.spark.sql.connector.distributions.{ClusteredDistribution,
Distribution, Distributions}
+import org.apache.spark.sql.connector.expressions.{Expression, Expressions,
SortOrder}
+
+import scala.collection.JavaConverters._
+
+/** Distribution requirements of Spark write. */
+case class SparkWriteRequirement(distribution: Distribution, ordering:
Array[SortOrder])
+
+object SparkWriteRequirement {
+ private val EMPTY_ORDERING: Array[SortOrder] = Array.empty
+ private val EMPTY: SparkWriteRequirement =
+ SparkWriteRequirement(Distributions.unspecified(), EMPTY_ORDERING)
+
+ def apply(table: FileStoreTable): SparkWriteRequirement = {
+ val bucketSpec = table.bucketSpec()
+ val bucketTransforms = bucketSpec.getBucketMode match {
+ case BucketMode.HASH_FIXED =>
+ Seq(
+ Expressions.bucket(
+ bucketSpec.getNumBuckets,
+ bucketSpec.getBucketKeys.asScala.map(quote).toArray: _*))
+ case BucketMode.BUCKET_UNAWARE =>
+ Seq.empty
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Unsupported bucket mode ${bucketSpec.getBucketMode}")
+ }
+
+ val partitionTransforms =
+ table.schema().partitionKeys().asScala.map(key =>
Expressions.identity(quote(key)))
+ val clusteringExpressions =
+ (partitionTransforms ++
bucketTransforms).map(identity[Expression]).toArray
+
+ if (clusteringExpressions.isEmpty) {
+ EMPTY
+ } else {
+ val distribution: ClusteredDistribution =
+ Distributions.clustered(clusteringExpressions)
+ SparkWriteRequirement(distribution, EMPTY_ORDERING)
+ }
+ }
+
+ private def quote(columnName: String): String =
+ s"`${columnName.replace("`", "``")}`"
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
index 14bc67ab1e..cf02654063 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
@@ -66,6 +66,7 @@ import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -96,6 +97,8 @@ public class BucketFunctionTest {
private static final int COMPACTED_DECIMAL_SCALE = 9;
private static final int TIMESTAMP_PRECISION = 6;
+ private static final String TIMESTAMP_COL_PRECISION_3 =
"timestamp_col_precision_3";
+
private static final RowType ROW_TYPE =
new RowType(
Arrays.asList(
@@ -124,12 +127,13 @@ public class BucketFunctionTest {
new
LocalZonedTimestampType(TIMESTAMP_PRECISION)),
new DataField(
12, BINARY_COL, new
VarBinaryType(VarBinaryType.MAX_LENGTH)),
- new DataField(13, ID_COL, new IntType())));
+ new DataField(13, ID_COL, new IntType()),
+ new DataField(14, TIMESTAMP_COL_PRECISION_3, new
TimestampType(3))));
private static final InternalRow NULL_PAIMON_ROW =
GenericRow.of(
- null, null, null, null, null, null, null, null, null,
null, null, null, null,
- 0);
+ null, null, null, null, null, null, null, null, null,
null, null, null, null, 0,
+ null);
private static InternalRow randomPaimonInternalRow() {
Random random = new Random();
@@ -152,7 +156,8 @@ public class BucketFunctionTest {
Timestamp.fromMicros(System.nanoTime() / 1000),
Timestamp.fromMicros(System.nanoTime() / 1000),
UUID.randomUUID().toString().getBytes(),
- 1);
+ 1,
+ Timestamp.fromEpochMillis(System.currentTimeMillis()));
}
private static final String TABLE_NAME = "test_bucket";
@@ -199,7 +204,9 @@ public class BucketFunctionTest {
CoreOptions.BUCKET_KEY.key(),
String.join(",", bucketColumns),
CoreOptions.BUCKET.key(),
- String.valueOf(NUM_BUCKETS)),
+ String.valueOf(NUM_BUCKETS),
+ "file.format",
+ "avro"),
""));
FileStoreTable storeTable =
FileStoreTableFactory.create(LocalFileIO.create(),
tablePath, tableSchema);
@@ -220,11 +227,6 @@ public class BucketFunctionTest {
}
}
- @Test
- public void test() {
- validateSparkBucketFunction(INTEGER_COL);
- }
-
private static void validateSparkBucketFunction(String... bucketColumns) {
setupTable(bucketColumns);
spark.sql(
@@ -307,10 +309,24 @@ public class BucketFunctionTest {
while (bucketColumns == null || bucketColumns.length < 2) {
bucketColumns =
allColumns.stream()
+ .filter(e ->
!Objects.equals(TIMESTAMP_COL_PRECISION_3, e))
.filter(e ->
ThreadLocalRandom.current().nextBoolean())
.toArray(String[]::new);
}
validateSparkBucketFunction(bucketColumns);
}
+
+ @Test
+ public void testTimestampPrecisionNotEqualToSpark() {
+ setupTable(TIMESTAMP_COL_PRECISION_3);
+ spark.sql(
+ String.format(
+ "SELECT id_col, __paimon_bucket as
expected_bucket, paimon.bucket(%s, %s) FROM %s",
+ NUM_BUCKETS,
+ String.join(",", TIMESTAMP_COL_PRECISION_3),
+ TABLE_NAME))
+ .collectAsList()
+ .forEach(row ->
Assertions.assertThat(row.getInt(2)).isNotEqualTo(row.get(1)));
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index 8bc640bf37..a84fb8d4c8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -40,6 +40,12 @@ class SparkWriteWithNoExtensionITCase extends
SparkWriteITCase {
}
}
+class SparkV2WriteITCase extends SparkWriteITCase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
+
class SparkWriteITCase extends PaimonSparkTestBase {
test("Paimon Write : Postpone Bucket") {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
new file mode 100644
index 0000000000..02a5b9a830
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteRequireDistributionTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
AppendDataExecV1}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+
+class V2WriteRequireDistributionTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelper {
+
+ test("V2 write primary key table") {
+
+ withTable("t1") {
+ spark.sql(
+ "CREATE TABLE t1 (type STRING, id INT, data STRING) partitioned by
(type) TBLPROPERTIES ('primary-key' = 'type,id', 'bucket'='2',
'file.format'='avro')")
+ val query =
+ "INSERT INTO t1 VALUES ('foo', 1, 'x1'), ('foo',1, 'X1'), ('foo', 2,
'x2'), ('foo', 2, 'X2'), ('bar', 3, 'x3'), ('bar', 3, 'X3')"
+
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ val df = spark.sql(query)
+ val nodes = collect(
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
{
+ case shuffle: ShuffleExchangeLike => shuffle
+ case append: AppendDataExec => append
+ }
+
+ assert(nodes.size == 2)
+
+ val node1 = nodes(0)
+ assert(
+ node1.isInstanceOf[AppendDataExec] &&
+ node1.toString.contains("PaimonWrite(table=test.t1"),
+ s"Expected AppendDataExec with specific paimon write, but got:
$node1"
+ )
+
+ val node2 = nodes(1)
+ assert(
+ node2.isInstanceOf[ShuffleExchangeLike] &&
+
node2.toString.contains("org.apache.paimon.spark.catalog.functions.BucketFunction"),
+ s"Expected ShuffleExchangeLike with BucketFunction, but got: $node2"
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM t1"),
+ Seq(
+ Row("foo", 1, "X1"),
+ Row("foo", 2, "X2"),
+ Row("bar", 3, "X3")
+ ))
+ }
+ }
+ }
+
+ test("V2 write append only table") {
+
+ withTable("t1") {
+ spark.sql(
+ "CREATE TABLE t1 (type STRING, id INT, data STRING) partitioned by
(type) TBLPROPERTIES ('bucket-key' = 'id', 'bucket'='2', 'file.format'='avro')")
+ val query =
+ "INSERT INTO t1 VALUES ('foo', 1, 'x1'), ('foo',1, 'X1'), ('foo', 2,
'x2'), ('foo', 2, 'X2'), ('bar', 3, 'x3'), ('bar', 3, 'X3')"
+
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ val df = spark.sql(query)
+ val nodes = collect(
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
{
+ case shuffle: ShuffleExchangeLike => shuffle
+ case append: AppendDataExec => append
+ }
+
+ assert(nodes.size == 2)
+
+ val node1 = nodes(0)
+ assert(
+ node1.isInstanceOf[AppendDataExec] &&
+ node1.toString.contains("PaimonWrite(table=test.t1"),
+ s"Expected AppendDataExec with specific paimon write, but got:
$node1"
+ )
+
+ val node2 = nodes(1)
+ assert(
+ node2.isInstanceOf[ShuffleExchangeLike] &&
+
node2.toString.contains("org.apache.paimon.spark.catalog.functions.BucketFunction"),
+ s"Expected ShuffleExchangeLike with BucketFunction, but got: $node2"
+ )
+ }
+
+ checkAnswer(
+ spark.sql("SELECT * FROM t1"),
+ Seq(
+ Row("foo", 1, "x1"),
+ Row("foo", 1, "X1"),
+ Row("foo", 2, "x2"),
+ Row("foo", 2, "X2"),
+ Row("bar", 3, "x3"),
+ Row("bar", 3, "X3")
+ ))
+ }
+ }
+
+ test("Fallback to v1 write") {
+
+ withTable("t1") {
+ spark.sql(
+ "CREATE TABLE t1 (id INT, data STRING) partitioned by(data)
TBLPROPERTIES ('primary-key' = 'id', 'bucket'='-1')")
+
+ val query = "INSERT INTO t1 VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4,
'x4'), (5, 'x5')"
+
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
+ val df = spark.sql(query)
+ val nodes = collect(
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
{
+ case append: AppendDataExecV1 => append
+ }
+
+ assert(nodes.size == 1)
+ val node1 = nodes(0)
+ assert(
+ node1.isInstanceOf[AppendDataExecV1] &&
+ node1.toString.contains("AppendDataExecV1
PrimaryKeyFileStoreTable[test.t1]"),
+ s"Expected AppendDataExec with specific paimon write, but got:
$node1"
+ )
+ }
+ }
+ }
+}