This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b9ccf8a602 [SPARK] Fix streaming writer overwriting batch data by
scoping OVERWRITE_BY_FILTER to batch only (#7458)
b9ccf8a602 is described below
commit b9ccf8a6026023ad9225664494b28c70f3e9a911
Author: gregdiy <[email protected]>
AuthorDate: Sat May 23 19:33:38 2026 -0700
[SPARK] Fix streaming writer overwriting batch data by scoping
OVERWRITE_BY_FILTER to batch only (#7458)
---
.../apache/paimon/spark/PaimonSparkTableBase.scala | 3 +-
.../org/apache/paimon/spark/PaimonSinkTest.scala | 50 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 1732d6778d..0fc4bd9eb5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -92,17 +92,18 @@ abstract class PaimonSparkTableBase(val table: Table)
override def capabilities: JSet[TableCapability] = {
val capabilities = JEnumSet.of(
TableCapability.BATCH_READ,
- TableCapability.OVERWRITE_BY_FILTER,
TableCapability.MICRO_BATCH_READ
)
if (useV2Write) {
capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
capabilities.add(TableCapability.BATCH_WRITE)
+ capabilities.add(TableCapability.OVERWRITE_BY_FILTER)
capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
} else {
capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
capabilities.add(TableCapability.V1_BATCH_WRITE)
+ capabilities.add(TableCapability.OVERWRITE_BY_FILTER)
}
capabilities
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 3df52b78b2..dcb99224cd 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -362,4 +362,54 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
}
}
}
+
+ test("Paimon Sink: batch then stream should not overwrite batch data") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ // create a primary key table
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ // Phase 1 - batch insert
+ spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a"),
+ Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+ // Phase 2 - streaming should append, not overwrite
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], id: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ try {
+ inputData.addData((4, "d"))
+ stream.processAllAvailable()
+ // batch data must still be present alongside stream data
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a"),
+ Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)
+
+ inputData.addData((5, "e"))
+ stream.processAllAvailable()
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a"),
+ Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") ::
Row(5, "e") :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}