This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d4edc22aae [core] Fix incremental query with delete after minor 
compact (#5115)
d4edc22aae is described below

commit d4edc22aaed6c029312c17aeec5d7e7b44593443
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Feb 20 10:18:27 2025 +0800

    [core] Fix incremental query with delete after minor compact (#5115)
---
 .../source/splitread/IncrementalDiffSplitRead.java |  2 +-
 .../spark/sql/TableValuedFunctionsTest.scala       | 40 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 2e236b1dff..3b9f28d191 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -98,7 +98,7 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
                                 split.bucket(),
                                 split.dataFiles(),
                                 split.deletionFiles().orElse(null),
-                                false),
+                                forceKeepDelete),
                         mergeRead.keyComparator(),
                         mergeRead.createUdsComparator(),
                         mergeRead.mergeSorter(),
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index addf846100..117a5af02d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -245,6 +245,46 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
     }
   }
 
+  test("Table Valued Functions: incremental query with delete after minor 
compact") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (id INT) USING paimon
+            |TBLPROPERTIES ('primary-key'='id', 'bucket' = '1', 'write-only' = 
'true')
+            |""".stripMargin)
+
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id FROM range (1, 
100001)")
+      sql("CALL sys.compact(table => 'T')")
+      sql("INSERT INTO t VALUES 100001")
+      sql("INSERT INTO t VALUES 100002")
+      sql("CALL sys.create_tag('t', 'tag1')")
+
+      sql(
+        "CALL sys.compact(table => 'T', compact_strategy => 'minor', options 
=> 'num-sorted-run.compaction-trigger=2')")
+      sql("DELETE FROM t WHERE id = 999")
+      sql("CALL sys.create_tag('t', 'tag2')")
+
+      //            tag1                          tag2
+      // l0         f(+I 10001),f(+I 10002)       f(-D 999)
+      // l1
+      // l2
+      // l3
+      // l4                                       f(+I 10001,10002)
+      // l5         f(+I 1-10000)                 f(+I 1-10000)
+      checkAnswer(
+        sql("SELECT level FROM `t$files` VERSION AS OF 'tag1' ORDER BY level"),
+        Seq(Row(0), Row(0), Row(5)))
+      checkAnswer(
+        sql("SELECT level FROM `t$files` VERSION AS OF 'tag2' ORDER BY level"),
+        Seq(Row(0), Row(4), Row(5)))
+
+      // before files: f(+I 10001), f(+I 10002)
+      // after files:  f(-D 999),   f(+I 10001,10002)
+      checkAnswer(
+        sql("SELECT * FROM paimon_incremental_query('`t$audit_log`', 'tag1', 
'tag2') ORDER BY id"),
+        Seq(Row("-D", 999)))
+    }
+  }
+
   private def incrementalDF(tableIdent: String, start: Int, end: Int): 
DataFrame = {
     spark.read
       .format("paimon")

Reply via email to