This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b9ccf8a602 [SPARK] Fix streaming writer overwriting batch data by 
scoping OVERWRITE_BY_FILTER to batch only (#7458)
b9ccf8a602 is described below

commit b9ccf8a6026023ad9225664494b28c70f3e9a911
Author: gregdiy <[email protected]>
AuthorDate: Sat May 23 19:33:38 2026 -0700

    [SPARK] Fix streaming writer overwriting batch data by scoping 
OVERWRITE_BY_FILTER to batch only (#7458)
---
 .../apache/paimon/spark/PaimonSparkTableBase.scala |  3 +-
 .../org/apache/paimon/spark/PaimonSinkTest.scala   | 50 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 1732d6778d..0fc4bd9eb5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -92,17 +92,18 @@ abstract class PaimonSparkTableBase(val table: Table)
   override def capabilities: JSet[TableCapability] = {
     val capabilities = JEnumSet.of(
       TableCapability.BATCH_READ,
-      TableCapability.OVERWRITE_BY_FILTER,
       TableCapability.MICRO_BATCH_READ
     )
 
     if (useV2Write) {
       capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
       capabilities.add(TableCapability.BATCH_WRITE)
+      capabilities.add(TableCapability.OVERWRITE_BY_FILTER)
       capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
     } else {
       capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
       capabilities.add(TableCapability.V1_BATCH_WRITE)
+      capabilities.add(TableCapability.OVERWRITE_BY_FILTER)
     }
 
     capabilities
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 3df52b78b2..dcb99224cd 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -362,4 +362,54 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
       }
     }
   }
+
+  test("Paimon Sink: batch then stream should not overwrite batch data") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          // create a primary key table
+          spark.sql(s"""
+                       |CREATE TABLE T (a INT, b STRING)
+                       |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+                       |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          // Phase 1 - batch insert
+          spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+          checkAnswer(
+            spark.sql("SELECT * FROM T ORDER BY a"),
+            Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+          // Phase 2 - streaming should append, not overwrite
+          val inputData = MemoryStream[(Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], id: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          try {
+            inputData.addData((4, "d"))
+            stream.processAllAvailable()
+            // batch data must still be present alongside stream data
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a"),
+              Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil)
+
+            inputData.addData((5, "e"))
+            stream.processAllAvailable()
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a"),
+              Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: 
Row(5, "e") :: Nil)
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
 }

Reply via email to