This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed49eff445 [spark] Implement abort cleanup for uncommitted data files
(#7831)
ed49eff445 is described below
commit ed49eff4453c483cd7a46988eebf25744d0ebe7e
Author: Zouxxyy <[email protected]>
AuthorDate: Thu May 14 00:19:00 2026 +0800
[spark] Implement abort cleanup for uncommitted data files (#7831)
---
.../paimon/spark/write/PaimonBatchWrite.scala | 17 ++++-
.../paimon/spark/write/PaimonBatchWrite.scala | 17 ++++-
.../apache/paimon/spark/sql/SparkWriteITCase.scala | 78 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index d546eebf4c..bde8f028c5 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -47,6 +47,8 @@ case class PaimonBatchWrite(
protected val metricRegistry = SparkMetricRegistry()
+ @volatile private var commitStarted: Boolean = false
+
protected val batchWriteBuilder: BatchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
@@ -68,6 +70,7 @@ case class PaimonBatchWrite(
override def useCommitCoordinator(): Boolean = false
override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ commitStarted = true
logInfo(s"Committing to table ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
batchTableCommit.withMetricRegistry(metricRegistry)
@@ -107,7 +110,19 @@ case class PaimonBatchWrite(
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {
- // TODO clean uncommitted files
+ if (commitStarted) {
+ logWarning(s"Skip abort cleanup for table ${table.name()} because commit
has already started")
+ return
+ }
+
+ logInfo(s"Aborting write to table ${table.name()}")
+ val batchTableCommit = batchWriteBuilder.newCommit()
+ try {
+ val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
+ batchTableCommit.abort(commitMessages.asJava)
+ } finally {
+ batchTableCommit.close()
+ }
}
private def buildDeletedCommitMessage(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
index 92aeae0313..1f2abae0b0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -47,6 +47,8 @@ case class PaimonBatchWrite(
protected val metricRegistry = SparkMetricRegistry()
+ @volatile private var commitStarted: Boolean = false
+
protected val batchWriteBuilder: BatchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
@@ -68,6 +70,7 @@ case class PaimonBatchWrite(
override def useCommitCoordinator(): Boolean = false
override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ commitStarted = true
logInfo(s"Committing to table ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
batchTableCommit.withMetricRegistry(metricRegistry)
@@ -107,7 +110,19 @@ case class PaimonBatchWrite(
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {
- // TODO clean uncommitted files
+ if (commitStarted) {
+ logWarning(s"Skip abort cleanup for table ${table.name()} because commit
has already started")
+ return
+ }
+
+ logInfo(s"Aborting write to table ${table.name()}")
+ val batchTableCommit = batchWriteBuilder.newCommit()
+ try {
+ val commitMessages = WriteTaskResult.merge(messages.filter(_ != null))
+ batchTableCommit.abort(commitMessages.asJava)
+ } finally {
+ batchTableCommit.close()
+ }
}
private def buildDeletedCommitMessage(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
index b2ae78f1ce..7496c17854 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkWriteITCase.scala
@@ -23,16 +23,21 @@ import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.catalog.Identifier
import org.apache.paimon.schema.Schema
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.write.{PaimonBatchWrite, WriteTaskResult}
+import org.apache.paimon.table.sink.CommitMessageImpl
import org.apache.paimon.types.DataTypes
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions
import java.sql.Timestamp
import java.time.LocalDateTime
+import scala.collection.JavaConverters._
+
class SparkWriteWithNoExtensionITCase extends SparkWriteITCase {
/** Disable the spark extension. */
@@ -272,6 +277,79 @@ class SparkWriteITCase extends PaimonSparkTestBase {
}
}
+ test("Paimon Write: abort cleans uncommitted files") {
+ withTable("T") {
+ spark.sql(
+ "CREATE TABLE T (id INT, data INT) TBLPROPERTIES ('bucket' = '1',
'bucket-key' = 'id')")
+
+ val table = loadTable("T")
+ val sparkSchema = spark.table("T").schema
+ val batchWrite = PaimonBatchWrite(table, sparkSchema, sparkSchema, None,
None)
+ val dataWriter =
batchWrite.createBatchWriterFactory(null).createWriter(0, 0L)
+
+ dataWriter.write(new GenericInternalRow(Array[Any](1, 10)))
+ val writerCommitMessage = dataWriter.commit()
+ val dataFilePaths = dataFilePathsFromWriteTaskResult(table,
writerCommitMessage)
+
+ assertThat(dataFilePaths.size).isGreaterThan(0)
+ dataFilePaths.foreach(path =>
assertThat(table.fileIO().exists(path)).isTrue)
+
+ batchWrite.abort(Array(writerCommitMessage, null))
+
+ dataFilePaths.foreach(path =>
assertThat(table.fileIO().exists(path)).isFalse)
+ assertThat(table.latestSnapshot()).isEmpty
+ }
+ }
+
+ test("Paimon Write: abort skips cleanup after commit starts") {
+ withTable("T") {
+ spark.sql(
+ "CREATE TABLE T (id INT, data INT) TBLPROPERTIES ('bucket' = '1',
'bucket-key' = 'id')")
+
+ val table = loadTable("T")
+ val sparkSchema = spark.table("T").schema
+ val batchWrite = PaimonBatchWrite(table, sparkSchema, sparkSchema, None,
None)
+ val dataWriter =
batchWrite.createBatchWriterFactory(null).createWriter(0, 0L)
+
+ dataWriter.write(new GenericInternalRow(Array[Any](1, 10)))
+ val writerCommitMessage = dataWriter.commit()
+ val dataFilePaths = dataFilePathsFromWriteTaskResult(table,
writerCommitMessage)
+
+ assertThat(dataFilePaths.size).isGreaterThan(0)
+ dataFilePaths.foreach(path =>
assertThat(table.fileIO().exists(path)).isTrue)
+
+ try {
+ batchWrite.commit(Array(writerCommitMessage))
+ } catch {
+ case _: Throwable =>
+ }
+ assertThat(table.latestSnapshot()).isPresent
+
+ batchWrite.abort(Array(writerCommitMessage, null))
+
+ dataFilePaths.foreach(path =>
assertThat(table.fileIO().exists(path)).isTrue)
+ checkAnswer(spark.sql("SELECT * FROM T"), Row(1, 10) :: Nil)
+ }
+ }
+
+ private def dataFilePathsFromWriteTaskResult(
+ table: org.apache.paimon.table.FileStoreTable,
+ writerCommitMessage:
org.apache.spark.sql.connector.write.WriterCommitMessage) = {
+ WriteTaskResult.merge(Seq(writerCommitMessage)).flatMap {
+ case commitMessage: CommitMessageImpl =>
+ val pathFactory = table
+ .store()
+ .pathFactory()
+ .createDataFilePathFactory(commitMessage.partition(),
commitMessage.bucket())
+ commitMessage
+ .newFilesIncrement()
+ .newFiles()
+ .asScala
+ .map(pathFactory.toPath)
+ case _ => Seq.empty
+ }
+ }
+
test("Paimon write: write table with timestamp3 bucket key") {
withTable("t") {
// create timestamp3 table using table api