This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7d862898e0 [spark] Refactor spark v2 DELETE (#6851)
7d862898e0 is described below
commit 7d862898e0cee2efea52ab6e90f0b7c88b9d327b
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Dec 21 15:38:45 2025 +0800
[spark] Refactor spark v2 DELETE (#6851)
---
.../generated/spark_connector_configuration.html | 2 +-
.../paimon/spark/sql/DeleteFromTableTest.scala | 8 +
.../paimon/spark/sql/V2DeleteFromTableTest.scala | 28 ---
.../paimon/spark/sql/DeleteFromTableTest.scala | 8 +
.../apache/paimon/spark/SparkConnectorOptions.java | 2 +-
.../spark/PaimonSparkCopyOnWriteOperation.scala | 105 ---------
.../PaimonSparkRowLevelOperationBuilder.scala | 31 ---
.../scala/org/apache/paimon/spark/SparkTable.scala | 15 +-
.../catalyst/analysis/PaimonDeleteTable.scala | 44 ++--
.../paimon/spark/format/PaimonFormatTable.scala | 24 +-
.../{scan => rowops}/PaimonCopyOnWriteScan.scala | 45 ++--
.../rowops/PaimonSparkCopyOnWriteOperation.scala | 63 ++++++
.../paimon/spark/write/BaseV2WriteBuilder.scala | 43 ++--
.../paimon/spark/write/PaimonBatchWrite.scala | 133 +++++++++++
.../paimon/spark/write/PaimonV2DataWriter.scala | 80 +++++++
.../apache/paimon/spark/write/PaimonV2Write.scala | 252 +--------------------
.../paimon/spark/write/PaimonV2WriteBuilder.scala | 16 +-
.../paimon/spark/write/PaimonWriteBuilder.scala | 1 -
.../paimon/spark/sql/DeleteFromTableTestBase.scala | 24 ++
19 files changed, 406 insertions(+), 518 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 6f16a972c2..24baa2431f 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -96,7 +96,7 @@ under the License.
<td><h5>write.use-v2-write</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
- <td>If true, v2 write will be used. Currently, only HASH_FIXED and
BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other
bucket modes. Currently, Spark V2 write does not support
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.</td>
+ <td>If true, v2 write will be used. Currently, only HASH_FIXED and
BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other
bucket modes. Currently, Spark V2 write does not support
TableCapability.STREAMING_WRITE.</td>
</tr>
</tbody>
</table>
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 09554a1dbf..ab33a40e59 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -18,4 +18,12 @@
package org.apache.paimon.spark.sql
+import org.apache.spark.SparkConf
+
class DeleteFromTableTest extends DeleteFromTableTestBase {}
+
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
deleted file mode 100644
index 947fe503b1..0000000000
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.sql
-
-import org.apache.spark.SparkConf
-
-class V2DeleteFromTableTest extends DeleteFromTableTestBase {
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .set("spark.paimon.write.use-v2-write", "true")
- }
-}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 09554a1dbf..ab33a40e59 100644
---
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -18,4 +18,12 @@
package org.apache.paimon.spark.sql
+import org.apache.spark.SparkConf
+
class DeleteFromTableTest extends DeleteFromTableTestBase {}
+
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 05630caceb..13305637ee 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -51,7 +51,7 @@ public class SparkConnectorOptions {
.booleanType()
.defaultValue(false)
.withDescription(
- "If true, v2 write will be used. Currently, only
HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1
write for other bucket modes. Currently, Spark V2 write does not support
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.");
+ "If true, v2 write will be used. Currently, only
HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1
write for other bucket modes. Currently, Spark V2 write does not support
TableCapability.STREAMING_WRITE.");
public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
key("read.stream.maxFilesPerTrigger")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
deleted file mode 100644
index b939ba8ef5..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.BucketFunctionType
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.catalog.functions.BucketFunction
-import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
-import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.spark.write.PaimonV2WriteBuilder
-import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
-import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED,
POSTPONE_MODE}
-
-import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
RowLevelOperation, RowLevelOperationInfo, WriteBuilder}
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-class PaimonSparkCopyOnWriteOperation(table: Table, info:
RowLevelOperationInfo)
- extends RowLevelOperation {
-
- private lazy val coreOptions = new CoreOptions(table.options())
-
- private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
-
- private lazy val useV2Write: Boolean = {
- val v2WriteConfigured = OptionUtils.useV2Write()
- v2WriteConfigured && supportsV2Write
- }
-
- private def supportsV2Write: Boolean = {
- coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
- table match {
- case storeTable: FileStoreTable =>
- storeTable.bucketMode() match {
- case HASH_FIXED => BucketFunction.supportsTable(storeTable)
- case BUCKET_UNAWARE | POSTPONE_MODE => true
- case _ => false
- }
-
- case _ => false
- }
- } && coreOptions.clusteringColumns().isEmpty
- }
-
- override def command(): RowLevelOperation.Command = info.command()
-
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
- table match {
- case t: InnerTable =>
- new
PaimonScanBuilder(t.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable]) {
- override def build(): Scan = {
- val scan =
- PaimonCopyOnWriteScan(t, requiredSchema, pushedPartitionFilters,
pushedDataFilters)
- PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
- scan
- }
- }
- case _ =>
- throw new UnsupportedOperationException(
- s"Scan is only supported for InnerTable. " +
- s"Actual table type:
${Option(table).map(_.getClass.getSimpleName).getOrElse("null")}"
- )
- }
- }
-
- override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
- table match {
- case fileStoreTable: FileStoreTable if useV2Write =>
- val options = Options.fromMap(info.options)
- val builder = new PaimonV2WriteBuilder(fileStoreTable, info.schema(),
options)
- builder.overwriteFiles(copyOnWriteScan)
- case _ =>
- throw new UnsupportedOperationException(
- s"Write operation is only supported for FileStoreTable with V2 write
enabled. " +
- s"Actual table type: ${table.getClass.getSimpleName}, useV2Write:
$useV2Write"
- )
- }
- }
-
- override def requiredMetadataAttributes(): Array[NamedReference] = {
- val attributes: Seq[NamedReference] = Seq.empty
- val updatedAttributes = attributes :+ Expressions.column(FILE_PATH_COLUMN)
- updatedAttributes.toArray
- }
-
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
deleted file mode 100644
index 57f805cb13..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.table.Table
-
-import org.apache.spark.sql.connector.write.{RowLevelOperation,
RowLevelOperationBuilder, RowLevelOperationInfo}
-
-class PaimonSparkRowLevelOperationBuilder(table: Table, info:
RowLevelOperationInfo)
- extends RowLevelOperationBuilder {
-
- override def build(): RowLevelOperation = {
- new PaimonSparkCopyOnWriteOperation(table, info)
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index f0e8f382fe..740e7b5994 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -18,7 +18,8 @@
package org.apache.paimon.spark
-import org.apache.paimon.table.Table
+import org.apache.paimon.spark.rowops.PaimonSparkCopyOnWriteOperation
+import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder,
RowLevelOperationInfo}
@@ -27,9 +28,17 @@ import
org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, RowLevelO
case class SparkTable(override val table: Table)
extends PaimonSparkTableBase(table)
with SupportsRowLevelOperations {
+
override def newRowLevelOperationBuilder(
- rowLevelOperationInfo: RowLevelOperationInfo): RowLevelOperationBuilder
= {
- new PaimonSparkRowLevelOperationBuilder(table, rowLevelOperationInfo)
+ info: RowLevelOperationInfo): RowLevelOperationBuilder = {
+ table match {
+ case t: FileStoreTable if useV2Write =>
+ () => new PaimonSparkCopyOnWriteOperation(t, info)
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Write operation is only supported for FileStoreTable with V2 write
enabled. " +
+ s"Actual table type: ${table.getClass.getSimpleName}, useV2Write:
$useV2Write")
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 15781cff90..46b4cc05b4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -22,41 +22,25 @@ import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable,
LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
-import scala.collection.JavaConverters._
-
object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
- /**
- * Determines if DataSourceV2 delete is not supported for the given table.
DataSourceV2 delete is
- * not supported in the following scenarios:
- * - Spark version is 3.4 or earlier
- * - Table does not use V2 write
- * - Row tracking is enabled
- * - Deletion vectors are enabled
- * - Table has primary keys defined
- * - Table is not a FileStoreTable
- * - Data evolution is enabled
- */
- private def shouldFallbackToV1Delete(table: SparkTable): Boolean = {
+ /** Determines if DataSourceV2 delete is not supported for the given table.
*/
+ private def shouldFallbackToV1Delete(table: SparkTable, condition:
Expression): Boolean = {
val baseTable = table.getTable
-
- val baseConditions = org.apache.spark.SPARK_VERSION <= "3.4" ||
- !table.useV2Write ||
- table.coreOptions.rowTrackingEnabled() ||
- table.coreOptions.deletionVectorsEnabled() ||
- !baseTable.primaryKeys().isEmpty
-
- baseConditions || {
- baseTable match {
- case paimonTable: FileStoreTable =>
- paimonTable.coreOptions().dataEvolutionEnabled()
- case _ =>
- true
- }
- }
+ org.apache.spark.SPARK_VERSION < "3.5" ||
+ !baseTable.isInstanceOf[FileStoreTable] ||
+ !baseTable.primaryKeys().isEmpty ||
+ !table.useV2Write ||
+ table.coreOptions.deletionVectorsEnabled() ||
+ table.coreOptions.rowTrackingEnabled() ||
+ table.coreOptions.dataEvolutionEnabled() ||
+ // todo: Optimize v2 delete when conditions are all partition filters
+ condition == null || condition == TrueLiteral
}
override val operation: RowLevelOp = Delete
@@ -64,7 +48,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with
RowLevelHelper {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case d @ DeleteFromTable(PaimonRelation(table), condition)
- if d.resolved && shouldFallbackToV1Delete(table) =>
+ if d.resolved && shouldFallbackToV1Delete(table, condition) =>
checkPaimonTable(table.getTable)
table.getTable match {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 1669cd8353..fb53fafa25 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -92,19 +92,16 @@ case class PaimonFormatTableWriterBuilder(table:
FormatTable, writeSchema: Struc
private case class FormatTableBatchWrite(
table: FormatTable,
- overwriteDynamic: Boolean,
+ overwriteDynamic: Option[Boolean],
overwritePartitions: Option[Map[String, String]],
writeSchema: StructType)
extends BatchWrite
with Logging {
- assert(
- !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
- "Cannot overwrite dynamically and by filter both")
-
private val batchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
- if (overwriteDynamic) {
+ // todo: add test for static overwrite the whole table
+ if (overwriteDynamic.contains(true)) {
builder.withOverwrite()
} else {
overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
@@ -112,8 +109,9 @@ private case class FormatTableBatchWrite(
builder
}
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
- FormatTableWriterFactory(batchWriteBuilder, writeSchema)
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
+ (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder,
writeSchema)
+ }
override def useCommitCoordinator(): Boolean = false
@@ -140,16 +138,6 @@ private case class FormatTableBatchWrite(
}
}
-private case class FormatTableWriterFactory(
- batchWriteBuilder: BatchWriteBuilder,
- writeSchema: StructType)
- extends DataWriterFactory {
-
- override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
- new FormatTableDataWriter(batchWriteBuilder, writeSchema)
- }
-}
-
private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder,
writeSchema: StructType)
extends V2DataWrite
with Logging {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
similarity index 68%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
index 3b4efbde0d..033d9eb569 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
@@ -16,12 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.scan
+package org.apache.paimon.spark.rowops
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.spark.commands.SparkDataFileMeta
+import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.spark.scan.BaseScan
import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
-import org.apache.paimon.table.{FileStoreTable, InnerTable}
+import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.spark.sql.PaimonUtils
@@ -37,7 +40,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
case class PaimonCopyOnWriteScan(
- table: InnerTable,
+ table: FileStoreTable,
requiredSchema: StructType,
pushedPartitionFilters: Seq[PartitionPredicate],
pushedDataFilters: Seq[Predicate])
@@ -45,15 +48,21 @@ case class PaimonCopyOnWriteScan(
with SupportsRuntimeV2Filtering {
override def inputSplits: Array[Split] =
dataSplits.asInstanceOf[Array[Split]]
- private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
var dataSplits: Array[DataSplit] = Array()
+ def scannedFiles: Seq[SparkDataFileMeta] = {
+ dataSplits
+ .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit,
dataSplit.totalBuckets()))
+ .toSeq
+ }
+
override def filterAttributes(): Array[NamedReference] = {
Array(Expressions.column(FILE_PATH_COLUMN))
}
override def filter(predicates: Array[SparkPredicate]): Unit = {
+ val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
val runtimefilters: Array[Filter] =
predicates.flatMap(PaimonUtils.filterV2ToV1)
for (filter <- runtimefilters) {
filter match {
@@ -66,23 +75,17 @@ case class PaimonCopyOnWriteScan(
}
}
- table match {
- case fileStoreTable: FileStoreTable =>
- val snapshotReader = fileStoreTable.newSnapshotReader()
- if (fileStoreTable.coreOptions().manifestDeleteFileDropStats()) {
- snapshotReader.dropStats()
- }
- if (pushedPartitionFilters.nonEmpty) {
-
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
- }
- if (pushedDataFilters.nonEmpty) {
-
snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
- }
- snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
- dataSplits =
- snapshotReader.read().splits().asScala.collect { case s: DataSplit
=> s }.toArray
-
- case _ => throw new RuntimeException("Only FileStoreTable support.")
+ val snapshotReader = table.newSnapshotReader()
+ if (table.coreOptions().manifestDeleteFileDropStats()) {
+ snapshotReader.dropStats()
+ }
+ if (pushedPartitionFilters.nonEmpty) {
+
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+ }
+ if (pushedDataFilters.nonEmpty) {
+ snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
}
+ snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
+ dataSplits = snapshotReader.read().splits().asScala.collect { case s:
DataSplit => s }.toArray
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
new file mode 100644
index 0000000000..e415e5cbf7
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.rowops
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.PaimonBaseScanBuilder
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.spark.write.PaimonV2WriteBuilder
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
RowLevelOperation, RowLevelOperationInfo, WriteBuilder}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, info:
RowLevelOperationInfo)
+ extends RowLevelOperation {
+
+ private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
+ override def command(): RowLevelOperation.Command = info.command()
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ new PaimonBaseScanBuilder {
+ override lazy val table: FileStoreTable =
+
PaimonSparkCopyOnWriteOperation.this.table.copy(options.asCaseSensitiveMap)
+
+ override def build(): Scan = {
+ val scan =
+ PaimonCopyOnWriteScan(table, requiredSchema, pushedPartitionFilters,
pushedDataFilters)
+ PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
+ scan
+ }
+ }
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ val options = Options.fromMap(info.options)
+ val builder = new PaimonV2WriteBuilder(table, info.schema(), options)
+ assert(copyOnWriteScan.isDefined)
+ builder.overwriteFiles(copyOnWriteScan.get)
+ }
+
+ override def requiredMetadataAttributes(): Array[NamedReference] = {
+ Array(Expressions.column(FILE_PATH_COLUMN))
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
index dda04245cf..3d37e7efca 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.write
-import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
import org.apache.paimon.table.Table
import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite,
SupportsOverwrite, WriteBuilder}
@@ -30,48 +30,35 @@ abstract class BaseV2WriteBuilder(table: Table)
with SupportsOverwrite
with SupportsDynamicOverwrite {
- protected var overwriteDynamic = false
+ protected var overwriteDynamic: Option[Boolean] = None
protected var overwritePartitions: Option[Map[String, String]] = None
-
- protected var isOverwriteFiles = false
protected var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
- def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): WriteBuilder = {
- this.isOverwriteFiles = true
- this.copyOnWriteScan = scan
+ def overwriteFiles(copyOnWriteScan: PaimonCopyOnWriteScan): WriteBuilder = {
+ assert(overwriteDynamic.isEmpty && overwritePartitions.isEmpty)
+ this.copyOnWriteScan = Some(copyOnWriteScan)
this
}
override def overwrite(filters: Array[Filter]): WriteBuilder = {
- if (overwriteDynamic) {
- throw new IllegalArgumentException("Cannot overwrite dynamically and by
filter both")
- }
-
+ assert(overwriteDynamic.isEmpty && copyOnWriteScan.isEmpty)
failIfCanNotOverwrite(filters)
- val conjunctiveFilters = if (filters.nonEmpty) {
- Some(filters.reduce((l, r) => And(l, r)))
+ overwriteDynamic = Some(false)
+ val conjunctiveFilters = filters.reduce((l, r) => And(l, r))
+ if (isTruncate(conjunctiveFilters)) {
+ overwritePartitions = Some(Map.empty[String, String])
} else {
- None
+ overwritePartitions = Some(
+ convertPartitionFilterToMap(conjunctiveFilters, partitionRowType()))
}
-
- if (isTruncate(conjunctiveFilters.get)) {
- overwritePartitions = Option.apply(Map.empty[String, String])
- } else {
- overwritePartitions =
- Option.apply(convertPartitionFilterToMap(conjunctiveFilters.get,
partitionRowType()))
- }
-
this
}
override def overwriteDynamicPartitions(): WriteBuilder = {
- if (overwritePartitions.exists(_.nonEmpty)) {
- throw new IllegalArgumentException("Cannot overwrite dynamically and by
filter both")
- }
-
- overwriteDynamic = true
- overwritePartitions = Option.apply(Map.empty[String, String])
+ assert(overwritePartitions.isEmpty && copyOnWriteScan.isEmpty)
+ overwriteDynamic = Some(true)
+ overwritePartitions = Some(Map.empty[String, String])
this
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
new file mode 100644
index 0000000000..589ba17451
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.spark.commands.SparkDataFileMeta
+import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl}
+
+import org.apache.spark.sql.PaimonSparkSession
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.types.StructType
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+case class PaimonBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+ extends BatchWrite
+ with WriteHelper {
+
+ protected val metricRegistry = SparkMetricRegistry()
+
+ protected val batchWriteBuilder: BatchWriteBuilder = {
+ val builder = table.newBatchWriteBuilder()
+ overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
+ builder
+ }
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
+ val fullCompactionDeltaCommits: Option[Int] =
+ Option.apply(coreOptions.fullCompactionDeltaCommits())
+ (_: Int, _: Long) => {
+ PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema,
fullCompactionDeltaCommits)
+ }
+ }
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to table ${table.name()}")
+ val batchTableCommit = batchWriteBuilder.newCommit()
+ batchTableCommit.withMetricRegistry(metricRegistry)
+ val addCommitMessage = WriteTaskResult.merge(messages)
+ val deletedCommitMessage = copyOnWriteScan match {
+ case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles)
+ case None => Seq.empty
+ }
+ val commitMessages = addCommitMessage ++ deletedCommitMessage
+ try {
+ val start = System.currentTimeMillis()
+ batchTableCommit.commit(commitMessages.asJava)
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ } finally {
+ batchTableCommit.close()
+ }
+ postDriverMetrics()
+ postCommit(commitMessages)
+ }
+
+ // Spark support v2 write driver metrics since 4.0, see
https://github.com/apache/spark/pull/48573
+ // To ensure compatibility with 3.x, manually post driver metrics here
instead of using Spark's API.
+ protected def postDriverMetrics(): Unit = {
+ val spark = PaimonSparkSession.active
+ // todo: find a more suitable way to get metrics.
+ val commitMetrics = metricRegistry.buildSparkCommitMetrics()
+ val executionId =
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ val executionMetrics = Compatibility.getExecutionMetrics(spark,
executionId.toLong).distinct
+ val metricUpdates = executionMetrics.flatMap {
+ m =>
+ commitMetrics.find(x =>
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
+ case Some(customTaskMetric) => Some((m.accumulatorId,
customTaskMetric.value()))
+ case None => None
+ }
+ }
+ SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext,
executionId, metricUpdates)
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ // TODO clean uncommitted files
+ }
+
+ private def buildDeletedCommitMessage(
+ deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
+ deletedFiles
+ .groupBy(f => (f.partition, f.bucket))
+ .map {
+ case ((partition, bucket), files) =>
+ val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ files.head.totalBuckets,
+ new DataIncrement(
+ Collections.emptyList[DataFileMeta],
+ deletedDataFileMetas,
+ Collections.emptyList[DataFileMeta]),
+ new CompactIncrement(
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta])
+ )
+ }
+ .toSeq
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
new file mode 100644
index 0000000000..d5291fe5d8
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
TableWriteImpl}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class PaimonV2DataWriter(
+ writeBuilder: BatchWriteBuilder,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ fullCompactionDeltaCommits: Option[Int],
+ batchId: Option[Long] = None)
+ extends abstractInnerTableDataWrite[InternalRow]
+ with InnerTableV2DataWrite {
+
+ private val ioManager = SparkUtils.createIOManager()
+
+ private val metricRegistry = SparkMetricRegistry()
+
+ val write: TableWriteImpl[InternalRow] = {
+ writeBuilder
+ .newWrite()
+ .withIOManager(ioManager)
+ .withMetricRegistry(metricRegistry)
+ .asInstanceOf[TableWriteImpl[InternalRow]]
+ }
+
+ private val rowConverter: InternalRow => SparkInternalRowWrapper = {
+ val numFields = writeSchema.fields.length
+ val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema,
dataSchema, numFields)
+ record => reusableWrapper.replace(record)
+ }
+
+ override def write(record: InternalRow): Unit = {
+ postWrite(write.writeAndReturn(rowConverter.apply(record)))
+ }
+
+ override def commitImpl(): Seq[CommitMessage] = {
+ write.prepareCommit().asScala.toSeq
+ }
+
+ override def abort(): Unit = close()
+
+ override def close(): Unit = {
+ try {
+ write.close()
+ ioManager.close()
+ } catch {
+ case e: Exception => throw new RuntimeException(e)
+ }
+ }
+
+ override def currentMetricsValues(): Array[CustomTaskMetric] = {
+ metricRegistry.buildSparkWriteMetrics()
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 5a46e208f5..e2ead2f726 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -18,38 +18,23 @@
package org.apache.paimon.spark.write
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
import org.apache.paimon.options.Options
import org.apache.paimon.spark._
-import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.commands.{SchemaHelper, SparkDataFileMeta}
-import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
-import org.apache.paimon.spark.metric.SparkMetricRegistry
-import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl, TableWriteImpl}
-import org.apache.paimon.table.source.DataSplit
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.PaimonSparkSession
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.distributions.Distribution
import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write._
-import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
class PaimonV2Write(
override val originTable: FileStoreTable,
- overwriteDynamic: Boolean,
overwritePartitions: Option[Map[String, String]],
+ copyOnWriteScan: Option[PaimonCopyOnWriteScan],
dataSchema: StructType,
options: Options
) extends Write
@@ -57,25 +42,7 @@ class PaimonV2Write(
with SchemaHelper
with Logging {
- assert(
- !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
- "Cannot overwrite dynamically and by filter both")
-
- private var isOverwriteFiles = false
-
- private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
-
private val writeSchema = mergeSchema(dataSchema, options)
-
- def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): PaimonV2Write = {
- this.isOverwriteFiles = true
- this.copyOnWriteScan = scan
- this
- }
-
- updateTableWithOptions(
- Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key ->
overwriteDynamic.toString))
-
private val writeRequirement = PaimonWriteRequirement(table)
override def requiredDistribution(): Distribution = {
@@ -91,11 +58,7 @@ class PaimonV2Write(
}
override def toBatch: BatchWrite = {
- if (isOverwriteFiles) {
- CopyOnWriteBatchWrite(table, writeSchema, dataSchema,
overwritePartitions, copyOnWriteScan)
- } else {
- PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
- }
+ PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions,
copyOnWriteScan)
}
override def supportedCustomMetrics(): Array[CustomMetric] = {
@@ -113,6 +76,7 @@ class PaimonV2Write(
}
override def toString: String = {
+ val overwriteDynamic = table.coreOptions().dynamicPartitionOverwrite()
val overwriteDynamicStr = if (overwriteDynamic) {
", overwriteDynamic=true"
} else {
@@ -120,211 +84,11 @@ class PaimonV2Write(
}
val overwritePartitionsStr = overwritePartitions match {
case Some(partitions) if partitions.nonEmpty => s",
overwritePartitions=$partitions"
- case Some(_) => ", overwriteTable=true"
- case None => ""
+ case Some(_) if !overwriteDynamic => ", overwriteTable=true"
+ case _ => ""
}
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
}
override def description(): String = toString
}
-
-private case class PaimonBatchWrite(
- table: FileStoreTable,
- writeSchema: StructType,
- dataSchema: StructType,
- overwritePartitions: Option[Map[String, String]])
- extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions) {}
-
-abstract class PaimonBatchWriteBase(
- table: FileStoreTable,
- writeSchema: StructType,
- dataSchema: StructType,
- overwritePartitions: Option[Map[String, String]])
- extends BatchWrite
- with WriteHelper {
-
- protected val metricRegistry = SparkMetricRegistry()
-
- protected val batchWriteBuilder = {
- val builder = table.newBatchWriteBuilder()
- overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
- builder
- }
-
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
- val fullCompactionDeltaCommits: Option[Int] =
- Option.apply(coreOptions.fullCompactionDeltaCommits())
- WriterFactory(writeSchema, dataSchema, batchWriteBuilder,
fullCompactionDeltaCommits)
- }
-
- override def useCommitCoordinator(): Boolean = false
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- logInfo(s"Committing to table ${table.name()}")
- val batchTableCommit = batchWriteBuilder.newCommit()
- batchTableCommit.withMetricRegistry(metricRegistry)
- val commitMessages = WriteTaskResult.merge(messages)
- try {
- val start = System.currentTimeMillis()
- batchTableCommit.commit(commitMessages.asJava)
- logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
- } finally {
- batchTableCommit.close()
- }
- postDriverMetrics()
- postCommit(commitMessages)
- }
-
- override def abort(messages: Array[WriterCommitMessage]): Unit = {
- // TODO clean uncommitted files
- }
-
- // Spark support v2 write driver metrics since 4.0, see
https://github.com/apache/spark/pull/48573
- // To ensure compatibility with 3.x, manually post driver metrics here
instead of using Spark's API.
- protected def postDriverMetrics(): Unit = {
- val spark = PaimonSparkSession.active
- // todo: find a more suitable way to get metrics.
- val commitMetrics = metricRegistry.buildSparkCommitMetrics()
- val executionId =
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- val executionMetrics = Compatibility.getExecutionMetrics(spark,
executionId.toLong).distinct
- val metricUpdates = executionMetrics.flatMap {
- m =>
- commitMetrics.find(x =>
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
- case Some(customTaskMetric) => Some((m.accumulatorId,
customTaskMetric.value()))
- case None => None
- }
- }
- SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext,
executionId, metricUpdates)
- }
-}
-
-private case class CopyOnWriteBatchWrite(
- table: FileStoreTable,
- writeSchema: StructType,
- dataSchema: StructType,
- overwritePartitions: Option[Map[String, String]],
- scan: Option[PaimonCopyOnWriteScan])
- extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions) {
-
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- logInfo(s"CopyOnWrite committing to table ${table.name()}")
-
- val batchTableCommit = batchWriteBuilder.newCommit()
-
- try {
- if (scan.isEmpty) {
- batchTableCommit.truncateTable()
- } else {
- val touchedFiles = candidateFiles(scan.get.dataSplits)
- val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
- val addCommitMessages = WriteTaskResult.merge(messages)
- val commitMessages = addCommitMessages ++ deletedCommitMessage
-
- batchTableCommit.withMetricRegistry(metricRegistry)
- val start = System.currentTimeMillis()
- batchTableCommit.commit(commitMessages.asJava)
- logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
- postCommit(commitMessages)
- }
- } finally {
- batchTableCommit.close()
- postDriverMetrics()
- }
- }
-
- private def candidateFiles(candidateDataSplits: Seq[DataSplit]):
Array[SparkDataFileMeta] = {
- val totalBuckets = coreOptions.bucket()
- candidateDataSplits
- .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit,
totalBuckets))
- .toArray
- }
-
- private def buildDeletedCommitMessage(
- deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
- deletedFiles
- .groupBy(f => (f.partition, f.bucket))
- .map {
- case ((partition, bucket), files) =>
- val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
-
- new CommitMessageImpl(
- partition,
- bucket,
- files.head.totalBuckets,
- new DataIncrement(
- Collections.emptyList[DataFileMeta],
- deletedDataFileMetas,
- Collections.emptyList[DataFileMeta]),
- new CompactIncrement(
- Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta])
- )
- }
- .toSeq
- }
-}
-
-private case class WriterFactory(
- writeSchema: StructType,
- dataSchema: StructType,
- batchWriteBuilder: BatchWriteBuilder,
- fullCompactionDeltaCommits: Option[Int])
- extends DataWriterFactory {
-
- override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
- PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema,
fullCompactionDeltaCommits)
- }
-}
-
-private case class PaimonV2DataWriter(
- writeBuilder: BatchWriteBuilder,
- writeSchema: StructType,
- dataSchema: StructType,
- fullCompactionDeltaCommits: Option[Int],
- batchId: Option[Long] = None)
- extends abstractInnerTableDataWrite[InternalRow]
- with InnerTableV2DataWrite {
-
- private val ioManager = SparkUtils.createIOManager()
-
- private val metricRegistry = SparkMetricRegistry()
-
- val write: TableWriteImpl[InternalRow] = {
- writeBuilder
- .newWrite()
- .withIOManager(ioManager)
- .withMetricRegistry(metricRegistry)
- .asInstanceOf[TableWriteImpl[InternalRow]]
- }
-
- private val rowConverter: InternalRow => SparkInternalRowWrapper = {
- val numFields = writeSchema.fields.length
- val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema,
dataSchema, numFields)
- record => reusableWrapper.replace(record)
- }
-
- override def write(record: InternalRow): Unit = {
- postWrite(write.writeAndReturn(rowConverter.apply(record)))
- }
-
- override def commitImpl(): Seq[CommitMessage] = {
- write.prepareCommit().asScala.toSeq
- }
-
- override def abort(): Unit = close()
-
- override def close(): Unit = {
- try {
- write.close()
- ioManager.close()
- } catch {
- case e: Exception => throw new RuntimeException(e)
- }
- }
-
- override def currentMetricsValues(): Array[CustomTaskMetric] = {
- metricRegistry.buildSparkWriteMetrics()
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index f8036ff40a..91f4f861ce 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -18,23 +18,25 @@
package org.apache.paimon.spark.write
+import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.RowType
import org.apache.spark.sql.types.StructType
+import scala.collection.JavaConverters._
+
class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType,
options: Options)
extends BaseV2WriteBuilder(table) {
- override def build = {
- val paimonV2Write =
- new PaimonV2Write(table, overwriteDynamic, overwritePartitions,
dataSchema, options)
- if (isOverwriteFiles) {
- paimonV2Write.overwriteFiles(copyOnWriteScan)
- } else {
- paimonV2Write
+ override def build: PaimonV2Write = {
+ val finalTable = overwriteDynamic match {
+ case Some(o) =>
+ table.copy(Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key ->
o.toString).asJava)
+ case _ => table
}
+ new PaimonV2Write(finalTable, overwritePartitions, copyOnWriteScan,
dataSchema, options)
}
override def partitionRowType(): RowType =
table.schema().logicalPartitionType()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
index cfd4750d0b..0e6209a598 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
@@ -46,5 +46,4 @@ class PaimonWriteBuilder(table: FileStoreTable, options:
Options)
this.saveMode = Overwrite(conjunctiveFilters)
this
}
-
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 3113b905f8..9daa975cba 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -75,6 +75,30 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
)
}
+ test(
+ s"Paimon Delete: append-only table, no match and full delete scenarios
with partitioned table") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
+ |""".stripMargin)
+
+ spark.sql("DELETE FROM T WHERE name = 'e'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF()
+ )
+
+ spark.sql("DELETE FROM T")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ spark.emptyDataFrame
+ )
+ }
+
test(s"Paimon Delete: append-only table with partition") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)