This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 81ceea9c09 [spark] Spark write supports
'full-compaction.delta-commits' (#6364)
81ceea9c09 is described below
commit 81ceea9c09fb95a706771d392006bc7f65992802
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 9 15:53:35 2025 +0800
[spark] Spark write supports 'full-compaction.delta-commits' (#6364)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 8 ++-
.../org/apache/paimon/schema/SchemaValidation.java | 4 +-
.../org/apache/paimon/spark/SparkTableWrite.scala | 22 +++---
.../paimon/spark/commands/PaimonSparkWriter.scala | 16 ++++-
.../spark/commands/WriteIntoPaimonTable.scala | 5 +-
.../apache/paimon/spark/sources/PaimonSink.scala | 4 +-
.../paimon/spark/write/DataWriteHelper.scala | 68 +++++++++++++++++++
.../apache/paimon/spark/write/PaimonV2Write.scala | 36 ++++++----
.../org/apache/paimon/spark/PaimonSinkTest.scala | 79 ++++++++++++++++++++++
10 files changed, 210 insertions(+), 34 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index f2e90e5dce..42c574d604 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -534,7 +534,7 @@ under the License.
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
- <td>Full compaction will be constantly triggered after delta
commits.</td>
+ <td>For streaming write, full compaction will be constantly
triggered after delta commits. For batch write, full compaction will be
triggered with each commit as long as this value is greater than 0.</td>
</tr>
<tr>
<td><h5>ignore-delete</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e2046da926..44e4c48f34 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1204,7 +1204,8 @@ public class CoreOptions implements Serializable {
.intType()
.noDefaultValue()
.withDescription(
- "Full compaction will be constantly triggered
after delta commits.");
+ "For streaming write, full compaction will be
constantly triggered after delta commits. "
+ + "For batch write, full compaction will
be triggered with each commit as long as this value is greater than 0.");
@ExcludeFromDocumentation("Internal use only")
public static final ConfigOption<StreamScanMode> STREAM_SCAN_MODE =
@@ -2788,6 +2789,11 @@ public class CoreOptions implements Serializable {
return consumerId;
}
+ @Nullable
+ public Integer fullCompactionDeltaCommits() {
+ return options.get(FULL_COMPACTION_DELTA_COMMITS);
+ }
+
public static StreamingReadMode streamReadType(Options options) {
return options.get(STREAMING_READ_MODE);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 905d8ad647..a05d198ffa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -60,7 +60,6 @@ import static
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
@@ -563,8 +562,7 @@ public class SchemaValidation {
"Cannot define 'bucket-key' with bucket = -1, please
remove the 'bucket-key' setting or specify a bucket number.");
}
- if (schema.primaryKeys().isEmpty()
- &&
options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
+ if (schema.primaryKeys().isEmpty() &&
options.fullCompactionDeltaCommits() != null) {
throw new RuntimeException(
"AppendOnlyTable of unaware or dynamic bucket does not
support 'full-compaction.delta-commits'");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 704a6c2d5b..5169e6ba76 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -20,11 +20,11 @@ package org.apache.paimon.spark
import org.apache.paimon.disk.IOManager
import org.apache.paimon.spark.util.SparkRowUtils
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder,
CommitMessageImpl, CommitMessageSerializer}
+import org.apache.paimon.spark.write.DataWriteHelper
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageImpl,
CommitMessageSerializer, TableWriteImpl}
import org.apache.paimon.types.RowType
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{PaimonUtils, Row}
+import org.apache.spark.sql.Row
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
@@ -33,18 +33,21 @@ case class SparkTableWrite(
writeBuilder: BatchWriteBuilder,
writeType: RowType,
rowKindColIdx: Int = -1,
- writeRowTracking: Boolean = false)
- extends SparkTableWriteTrait {
+ writeRowTracking: Boolean = false,
+ fullCompactionDeltaCommits: Option[Int],
+ batchId: Long)
+ extends SparkTableWriteTrait
+ with DataWriteHelper {
private val ioManager: IOManager = SparkUtils.createIOManager
- private val write: BatchTableWrite = {
+ val write: TableWriteImpl[Row] = {
val _write = writeBuilder.newWrite()
_write.withIOManager(ioManager)
if (writeRowTracking) {
_write.withWriteType(writeType)
}
- _write
+ _write.asInstanceOf[TableWriteImpl[Row]]
}
private val toPaimonRow = {
@@ -52,14 +55,15 @@ case class SparkTableWrite(
}
def write(row: Row): Unit = {
- write.write(toPaimonRow(row))
+ postWrite(write.writeAndReturn(toPaimonRow(row)))
}
def write(row: Row, bucket: Int): Unit = {
- write.write(toPaimonRow(row), bucket)
+ postWrite(write.writeAndReturn(toPaimonRow(row), bucket))
}
def finish(): Iterator[Array[Byte]] = {
+ preFinish()
var bytesWritten = 0L
var recordsWritten = 0L
val commitMessages = new ListBuffer[Array[Byte]]()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 6d0563b364..4a8392f55f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -51,13 +51,19 @@ import java.util.Collections.singletonMap
import scala.collection.JavaConverters._
-case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean
= false)
+case class PaimonSparkWriter(
+ table: FileStoreTable,
+ writeRowTracking: Boolean = false,
+ batchId: Long = -1)
extends WriteHelper {
private lazy val tableSchema = table.schema
private lazy val bucketMode = table.bucketMode
+ private val fullCompactionDeltaCommits: Option[Int] =
+ Option.apply(coreOptions.fullCompactionDeltaCommits())
+
@transient private lazy val serializer = new CommitMessageSerializer
private val writeType = {
@@ -98,7 +104,13 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowTracking: Boolean =
val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema,
BUCKET_COL)
val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
- def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx,
writeRowTracking)
+ def newWrite() = SparkTableWrite(
+ writeBuilder,
+ writeType,
+ rowKindColIdx,
+ writeRowTracking,
+ fullCompactionDeltaCommits,
+ batchId)
def sparkParallelism = {
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 5a9b36e269..2056590ead 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -36,7 +36,8 @@ case class WriteIntoPaimonTable(
override val originTable: FileStoreTable,
saveMode: SaveMode,
_data: DataFrame,
- options: Options)
+ options: Options,
+ batchId: Long = -1)
extends RunnableCommand
with ExpressionHelper
with SchemaHelper
@@ -50,7 +51,7 @@ case class WriteIntoPaimonTable(
updateTableWithOptions(
Map(DYNAMIC_PARTITION_OVERWRITE.key ->
dynamicPartitionOverwriteMode.toString))
- val writer = PaimonSparkWriter(table)
+ val writer = PaimonSparkWriter(table, batchId = batchId)
if (overwritePartition != null) {
writer.writeBuilder.withOverwrite(overwritePartition.asJava)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 3387a536ab..1c51208089 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -43,8 +43,8 @@ class PaimonSink(
} else {
InsertInto
}
- partitionColumns.foreach(println)
val newData = PaimonUtils.createNewDataFrame(data)
- WriteIntoPaimonTable(originTable, saveMode, newData,
options).run(sqlContext.sparkSession)
+ WriteIntoPaimonTable(originTable, saveMode, newData, options, batchId).run(
+ sqlContext.sparkSession)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
new file mode 100644
index 0000000000..6334eebe69
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.table.sink.{BatchTableWrite, SinkRecord}
+
+import org.apache.spark.internal.Logging
+
+import scala.collection.mutable
+
+trait DataWriteHelper extends Logging {
+
+ val write: BatchTableWrite
+
+ val fullCompactionDeltaCommits: Option[Int]
+
+ /**
+ * For batch write, batchId is -1, for streaming write, batchId is the
current batch id (>= 0).
+ */
+ val batchId: Long
+
+ private val needFullCompaction: Boolean = {
+ fullCompactionDeltaCommits match {
+ case Some(deltaCommits) =>
+ deltaCommits > 0 && (batchId == -1 || (batchId + 1) % deltaCommits ==
0)
+ case None => false
+ }
+ }
+
+ private val writtenBuckets = mutable.Set[(BinaryRow, Integer)]()
+
+ def postWrite(record: SinkRecord): Unit = {
+ if (record == null) {
+ return
+ }
+
+ if (needFullCompaction && !writtenBuckets.contains((record.partition(),
record.bucket()))) {
+ writtenBuckets.add((record.partition().copy(), record.bucket()))
+ }
+ }
+
+ def preFinish(): Unit = {
+ if (needFullCompaction && writtenBuckets.nonEmpty) {
+ logInfo("Start to compact buckets: " + writtenBuckets)
+ writtenBuckets.foreach(
+ (bucket: (BinaryRow, Integer)) => {
+ write.compact(bucket._1, bucket._2, true)
+ })
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 9eaa1bf72f..62a383eb7c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.options.Options
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
import org.apache.paimon.spark.commands.SchemaHelper
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder,
CommitMessage, CommitMessageSerializer}
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageSerializer, TableWriteImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
@@ -103,8 +103,11 @@ private case class PaimonBatchWrite(
builder
}
- override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
- WriterFactory(writeSchema, dataSchema, batchWriteBuilder)
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory = {
+ val fullCompactionDeltaCommits: Option[Int] =
+ Option.apply(coreOptions.fullCompactionDeltaCommits())
+ WriterFactory(writeSchema, dataSchema, batchWriteBuilder,
fullCompactionDeltaCommits)
+ }
override def useCommitCoordinator(): Boolean = false
@@ -139,23 +142,27 @@ private case class PaimonBatchWrite(
private case class WriterFactory(
writeSchema: StructType,
dataSchema: StructType,
- batchWriteBuilder: BatchWriteBuilder)
+ batchWriteBuilder: BatchWriteBuilder,
+ fullCompactionDeltaCommits: Option[Int])
extends DataWriterFactory {
override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
- val batchTableWrite = batchWriteBuilder.newWrite()
- new PaimonDataWriter(batchTableWrite, writeSchema, dataSchema)
+ val batchTableWrite =
batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]]
+ PaimonDataWriter(batchTableWrite, writeSchema, dataSchema,
fullCompactionDeltaCommits)
}
}
-private class PaimonDataWriter(
- batchTableWrite: BatchTableWrite,
+private case class PaimonDataWriter(
+ write: TableWriteImpl[InternalRow],
writeSchema: StructType,
- dataSchema: StructType)
- extends DataWriter[InternalRow] {
+ dataSchema: StructType,
+ fullCompactionDeltaCommits: Option[Int],
+ batchId: Long = -1)
+ extends DataWriter[InternalRow]
+ with DataWriteHelper {
private val ioManager = SparkUtils.createIOManager()
- batchTableWrite.withIOManager(ioManager)
+ write.withIOManager(ioManager)
private val rowConverter: InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
@@ -164,12 +171,13 @@ private class PaimonDataWriter(
}
override def write(record: InternalRow): Unit = {
- batchTableWrite.write(rowConverter.apply(record))
+ postWrite(write.writeAndReturn(rowConverter.apply(record)))
}
override def commit(): WriterCommitMessage = {
try {
- val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq
+ preFinish()
+ val commitMessages = write.prepareCommit().asScala.toSeq
TaskCommit(commitMessages)
} finally {
close()
@@ -180,7 +188,7 @@ private class PaimonDataWriter(
override def close(): Unit = {
try {
- batchTableWrite.close()
+ write.close()
ioManager.close()
} catch {
case e: Exception => throw new RuntimeException(e)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 61bf552494..c43170d7ba 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,8 @@
package org.apache.paimon.spark
+import org.apache.paimon.Snapshot.CommitKind._
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -283,4 +285,81 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
}
}
}
+
+ test("Paimon SinK: set full-compaction.delta-commits with batch write") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+ withTable("t") {
+ sql("""
+ |CREATE TABLE t (
+ | a INT,
+ | b INT
+ |) TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'bucket'='1',
+ | 'full-compaction.delta-commits'='1'
+ |)
+ |""".stripMargin)
+
+ sql("INSERT INTO t VALUES (1, 1)")
+ sql("INSERT INTO t VALUES (2, 2)")
+ checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2,
2)))
+ assert(loadTable("t").snapshotManager().latestSnapshot().commitKind
== COMPACT)
+ }
+ }
+ }
+ }
+
+ test("Paimon SinK: set full-compaction.delta-commits with streaming write") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b INT)
+ |TBLPROPERTIES (
+ | 'primary-key'='a',
+ | 'bucket'='1',
+ | 'full-compaction.delta-commits'='2'
+ |)
+ |""".stripMargin)
+ val table = loadTable("T")
+ val location = table.location().toString
+
+ val inputData = MemoryStream[(Int, Int)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .format("paimon")
+ .start(location)
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ inputData.addData((1, 1))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1)))
+ assert(table.snapshotManager().latestSnapshot().commitKind ==
APPEND)
+
+ inputData.addData((2, 1))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1), Row(2, 1)))
+ assert(table.snapshotManager().latestSnapshot().commitKind ==
COMPACT)
+
+ inputData.addData((2, 2))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1), Row(2, 2)))
+ assert(table.snapshotManager().latestSnapshot().commitKind ==
APPEND)
+
+ inputData.addData((3, 1))
+ stream.processAllAvailable()
+ checkAnswer(query(), Seq(Row(1, 1), Row(2, 2), Row(3, 1)))
+ assert(table.snapshotManager().latestSnapshot().commitKind ==
COMPACT)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}