This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ceea9c09 [spark] Spark write supports 
'full-compaction.delta-commits' (#6364)
81ceea9c09 is described below

commit 81ceea9c09fb95a706771d392006bc7f65992802
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 9 15:53:35 2025 +0800

    [spark] Spark write supports 'full-compaction.delta-commits' (#6364)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  8 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |  4 +-
 .../org/apache/paimon/spark/SparkTableWrite.scala  | 22 +++---
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 16 ++++-
 .../spark/commands/WriteIntoPaimonTable.scala      |  5 +-
 .../apache/paimon/spark/sources/PaimonSink.scala   |  4 +-
 .../paimon/spark/write/DataWriteHelper.scala       | 68 +++++++++++++++++++
 .../apache/paimon/spark/write/PaimonV2Write.scala  | 36 ++++++----
 .../org/apache/paimon/spark/PaimonSinkTest.scala   | 79 ++++++++++++++++++++++
 10 files changed, 210 insertions(+), 34 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index f2e90e5dce..42c574d604 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -534,7 +534,7 @@ under the License.
             <td><h5>full-compaction.delta-commits</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
-            <td>Full compaction will be constantly triggered after delta 
commits.</td>
+            <td>For streaming write, full compaction will be constantly 
triggered after delta commits. For batch write, full compaction will be 
triggered with each commit as long as this value is greater than 0.</td>
         </tr>
         <tr>
             <td><h5>ignore-delete</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e2046da926..44e4c48f34 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1204,7 +1204,8 @@ public class CoreOptions implements Serializable {
                     .intType()
                     .noDefaultValue()
                     .withDescription(
-                            "Full compaction will be constantly triggered 
after delta commits.");
+                            "For streaming write, full compaction will be 
constantly triggered after delta commits. "
+                                    + "For batch write, full compaction will 
be triggered with each commit as long as this value is greater than 0.");
 
     @ExcludeFromDocumentation("Internal use only")
     public static final ConfigOption<StreamScanMode> STREAM_SCAN_MODE =
@@ -2788,6 +2789,11 @@ public class CoreOptions implements Serializable {
         return consumerId;
     }
 
+    @Nullable
+    public Integer fullCompactionDeltaCommits() {
+        return options.get(FULL_COMPACTION_DELTA_COMMITS);
+    }
+
     public static StreamingReadMode streamReadType(Options options) {
         return options.get(STREAMING_READ_MODE);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 905d8ad647..a05d198ffa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -60,7 +60,6 @@ import static 
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
-import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
 import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
@@ -563,8 +562,7 @@ public class SchemaValidation {
                         "Cannot define 'bucket-key' with bucket = -1, please 
remove the 'bucket-key' setting or specify a bucket number.");
             }
 
-            if (schema.primaryKeys().isEmpty()
-                    && 
options.toMap().get(FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
+            if (schema.primaryKeys().isEmpty() && 
options.fullCompactionDeltaCommits() != null) {
                 throw new RuntimeException(
                         "AppendOnlyTable of unaware or dynamic bucket does not 
support 'full-compaction.delta-commits'");
             }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
index 704a6c2d5b..5169e6ba76 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -20,11 +20,11 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.disk.IOManager
 import org.apache.paimon.spark.util.SparkRowUtils
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessageImpl, CommitMessageSerializer}
+import org.apache.paimon.spark.write.DataWriteHelper
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessageImpl, 
CommitMessageSerializer, TableWriteImpl}
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.TaskContext
-import org.apache.spark.sql.{PaimonUtils, Row}
+import org.apache.spark.sql.Row
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
@@ -33,18 +33,21 @@ case class SparkTableWrite(
     writeBuilder: BatchWriteBuilder,
     writeType: RowType,
     rowKindColIdx: Int = -1,
-    writeRowTracking: Boolean = false)
-  extends SparkTableWriteTrait {
+    writeRowTracking: Boolean = false,
+    fullCompactionDeltaCommits: Option[Int],
+    batchId: Long)
+  extends SparkTableWriteTrait
+  with DataWriteHelper {
 
   private val ioManager: IOManager = SparkUtils.createIOManager
 
-  private val write: BatchTableWrite = {
+  val write: TableWriteImpl[Row] = {
     val _write = writeBuilder.newWrite()
     _write.withIOManager(ioManager)
     if (writeRowTracking) {
       _write.withWriteType(writeType)
     }
-    _write
+    _write.asInstanceOf[TableWriteImpl[Row]]
   }
 
   private val toPaimonRow = {
@@ -52,14 +55,15 @@ case class SparkTableWrite(
   }
 
   def write(row: Row): Unit = {
-    write.write(toPaimonRow(row))
+    postWrite(write.writeAndReturn(toPaimonRow(row)))
   }
 
   def write(row: Row, bucket: Int): Unit = {
-    write.write(toPaimonRow(row), bucket)
+    postWrite(write.writeAndReturn(toPaimonRow(row), bucket))
   }
 
   def finish(): Iterator[Array[Byte]] = {
+    preFinish()
     var bytesWritten = 0L
     var recordsWritten = 0L
     val commitMessages = new ListBuffer[Array[Byte]]()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 6d0563b364..4a8392f55f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -51,13 +51,19 @@ import java.util.Collections.singletonMap
 
 import scala.collection.JavaConverters._
 
-case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean 
= false)
+case class PaimonSparkWriter(
+    table: FileStoreTable,
+    writeRowTracking: Boolean = false,
+    batchId: Long = -1)
   extends WriteHelper {
 
   private lazy val tableSchema = table.schema
 
   private lazy val bucketMode = table.bucketMode
 
+  private val fullCompactionDeltaCommits: Option[Int] =
+    Option.apply(coreOptions.fullCompactionDeltaCommits())
+
   @transient private lazy val serializer = new CommitMessageSerializer
 
   private val writeType = {
@@ -98,7 +104,13 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
     val bucketColIdx = SparkRowUtils.getFieldIndex(withInitBucketCol.schema, 
BUCKET_COL)
     val encoderGroupWithBucketCol = EncoderSerDeGroup(withInitBucketCol.schema)
 
-    def newWrite() = SparkTableWrite(writeBuilder, writeType, rowKindColIdx, 
writeRowTracking)
+    def newWrite() = SparkTableWrite(
+      writeBuilder,
+      writeType,
+      rowKindColIdx,
+      writeRowTracking,
+      fullCompactionDeltaCommits,
+      batchId)
 
     def sparkParallelism = {
       val defaultParallelism = sparkSession.sparkContext.defaultParallelism
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 5a9b36e269..2056590ead 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -36,7 +36,8 @@ case class WriteIntoPaimonTable(
     override val originTable: FileStoreTable,
     saveMode: SaveMode,
     _data: DataFrame,
-    options: Options)
+    options: Options,
+    batchId: Long = -1)
   extends RunnableCommand
   with ExpressionHelper
   with SchemaHelper
@@ -50,7 +51,7 @@ case class WriteIntoPaimonTable(
     updateTableWithOptions(
       Map(DYNAMIC_PARTITION_OVERWRITE.key -> 
dynamicPartitionOverwriteMode.toString))
 
-    val writer = PaimonSparkWriter(table)
+    val writer = PaimonSparkWriter(table, batchId = batchId)
     if (overwritePartition != null) {
       writer.writeBuilder.withOverwrite(overwritePartition.asJava)
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 3387a536ab..1c51208089 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -43,8 +43,8 @@ class PaimonSink(
     } else {
       InsertInto
     }
-    partitionColumns.foreach(println)
     val newData = PaimonUtils.createNewDataFrame(data)
-    WriteIntoPaimonTable(originTable, saveMode, newData, 
options).run(sqlContext.sparkSession)
+    WriteIntoPaimonTable(originTable, saveMode, newData, options, batchId).run(
+      sqlContext.sparkSession)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
new file mode 100644
index 0000000000..6334eebe69
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWriteHelper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.table.sink.{BatchTableWrite, SinkRecord}
+
+import org.apache.spark.internal.Logging
+
+import scala.collection.mutable
+
+trait DataWriteHelper extends Logging {
+
+  val write: BatchTableWrite
+
+  val fullCompactionDeltaCommits: Option[Int]
+
+  /**
+   * For batch write, batchId is -1, for streaming write, batchId is the 
current batch id (>= 0).
+   */
+  val batchId: Long
+
+  private val needFullCompaction: Boolean = {
+    fullCompactionDeltaCommits match {
+      case Some(deltaCommits) =>
+        deltaCommits > 0 && (batchId == -1 || (batchId + 1) % deltaCommits == 
0)
+      case None => false
+    }
+  }
+
+  private val writtenBuckets = mutable.Set[(BinaryRow, Integer)]()
+
+  def postWrite(record: SinkRecord): Unit = {
+    if (record == null) {
+      return
+    }
+
+    if (needFullCompaction && !writtenBuckets.contains((record.partition(), 
record.bucket()))) {
+      writtenBuckets.add((record.partition().copy(), record.bucket()))
+    }
+  }
+
+  def preFinish(): Unit = {
+    if (needFullCompaction && writtenBuckets.nonEmpty) {
+      logInfo("Start to compact buckets: " + writtenBuckets)
+      writtenBuckets.foreach(
+        (bucket: (BinaryRow, Integer)) => {
+          write.compact(bucket._1, bucket._2, true)
+        })
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 9eaa1bf72f..62a383eb7c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
 import org.apache.paimon.spark.commands.SchemaHelper
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, 
CommitMessage, CommitMessageSerializer}
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageSerializer, TableWriteImpl}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
@@ -103,8 +103,11 @@ private case class PaimonBatchWrite(
     builder
   }
 
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
-    WriterFactory(writeSchema, dataSchema, batchWriteBuilder)
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
+    val fullCompactionDeltaCommits: Option[Int] =
+      Option.apply(coreOptions.fullCompactionDeltaCommits())
+    WriterFactory(writeSchema, dataSchema, batchWriteBuilder, 
fullCompactionDeltaCommits)
+  }
 
   override def useCommitCoordinator(): Boolean = false
 
@@ -139,23 +142,27 @@ private case class PaimonBatchWrite(
 private case class WriterFactory(
     writeSchema: StructType,
     dataSchema: StructType,
-    batchWriteBuilder: BatchWriteBuilder)
+    batchWriteBuilder: BatchWriteBuilder,
+    fullCompactionDeltaCommits: Option[Int])
   extends DataWriterFactory {
 
   override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
-    val batchTableWrite = batchWriteBuilder.newWrite()
-    new PaimonDataWriter(batchTableWrite, writeSchema, dataSchema)
+    val batchTableWrite = 
batchWriteBuilder.newWrite().asInstanceOf[TableWriteImpl[InternalRow]]
+    PaimonDataWriter(batchTableWrite, writeSchema, dataSchema, 
fullCompactionDeltaCommits)
   }
 }
 
-private class PaimonDataWriter(
-    batchTableWrite: BatchTableWrite,
+private case class PaimonDataWriter(
+    write: TableWriteImpl[InternalRow],
     writeSchema: StructType,
-    dataSchema: StructType)
-  extends DataWriter[InternalRow] {
+    dataSchema: StructType,
+    fullCompactionDeltaCommits: Option[Int],
+    batchId: Long = -1)
+  extends DataWriter[InternalRow]
+  with DataWriteHelper {
 
   private val ioManager = SparkUtils.createIOManager()
-  batchTableWrite.withIOManager(ioManager)
+  write.withIOManager(ioManager)
 
   private val rowConverter: InternalRow => SparkInternalRowWrapper = {
     val numFields = writeSchema.fields.length
@@ -164,12 +171,13 @@ private class PaimonDataWriter(
   }
 
   override def write(record: InternalRow): Unit = {
-    batchTableWrite.write(rowConverter.apply(record))
+    postWrite(write.writeAndReturn(rowConverter.apply(record)))
   }
 
   override def commit(): WriterCommitMessage = {
     try {
-      val commitMessages = batchTableWrite.prepareCommit().asScala.toSeq
+      preFinish()
+      val commitMessages = write.prepareCommit().asScala.toSeq
       TaskCommit(commitMessages)
     } finally {
       close()
@@ -180,7 +188,7 @@ private class PaimonDataWriter(
 
   override def close(): Unit = {
     try {
-      batchTableWrite.close()
+      write.close()
       ioManager.close()
     } catch {
       case e: Exception => throw new RuntimeException(e)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 61bf552494..c43170d7ba 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.Snapshot.CommitKind._
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
@@ -283,4 +285,81 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
       }
     }
   }
+
+  test("Paimon SinK: set full-compaction.delta-commits with batch write") {
+    for (useV2Write <- Seq("true", "false")) {
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+        withTable("t") {
+          sql("""
+                |CREATE TABLE t (
+                |  a INT,
+                |  b INT
+                |) TBLPROPERTIES (
+                |  'primary-key'='a',
+                |  'bucket'='1',
+                |  'full-compaction.delta-commits'='1'
+                |)
+                |""".stripMargin)
+
+          sql("INSERT INTO t VALUES (1, 1)")
+          sql("INSERT INTO t VALUES (2, 2)")
+          checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2, 
2)))
+          assert(loadTable("t").snapshotManager().latestSnapshot().commitKind 
== COMPACT)
+        }
+      }
+    }
+  }
+
+  test("Paimon SinK: set full-compaction.delta-commits with streaming write") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b INT)
+                       |TBLPROPERTIES (
+                       |  'primary-key'='a',
+                       |  'bucket'='1',
+                       |  'full-compaction.delta-commits'='2'
+                       |)
+                       |""".stripMargin)
+          val table = loadTable("T")
+          val location = table.location().toString
+
+          val inputData = MemoryStream[(Int, Int)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .format("paimon")
+            .start(location)
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+          try {
+            inputData.addData((1, 1))
+            stream.processAllAvailable()
+            checkAnswer(query(), Seq(Row(1, 1)))
+            assert(table.snapshotManager().latestSnapshot().commitKind == 
APPEND)
+
+            inputData.addData((2, 1))
+            stream.processAllAvailable()
+            checkAnswer(query(), Seq(Row(1, 1), Row(2, 1)))
+            assert(table.snapshotManager().latestSnapshot().commitKind == 
COMPACT)
+
+            inputData.addData((2, 2))
+            stream.processAllAvailable()
+            checkAnswer(query(), Seq(Row(1, 1), Row(2, 2)))
+            assert(table.snapshotManager().latestSnapshot().commitKind == 
APPEND)
+
+            inputData.addData((3, 1))
+            stream.processAllAvailable()
+            checkAnswer(query(), Seq(Row(1, 1), Row(2, 2), Row(3, 1)))
+            assert(table.snapshotManager().latestSnapshot().commitKind == 
COMPACT)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }

Reply via email to