This is an automated email from the ASF dual-hosted git repository.

Aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b965687700 [core] Fix wrong merge order of increment diff split read 
(#7033)
b965687700 is described below

commit b96568770046ae83a3177dbbeccfd66d6e0823da
Author: LiangDai-Mars <[email protected]>
AuthorDate: Tue May 19 19:20:51 2026 +0800

    [core] Fix wrong merge order of increment diff split read (#7033)
---
 .../source/splitread/IncrementalDiffSplitRead.java |  6 ++---
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 26 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 1e93f148b7..c5032158fa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -206,10 +206,8 @@ public class IncrementalDiffSplitRead implements 
SplitRead<InternalRow> {
             } else if (kvs.size() == 2) {
                 KeyValue before = kvs.get(0);
                 KeyValue after = kvs.get(1);
-                if (after.level() == AFTER_LEVEL) {
-                    if (!valueAndRowKindEquals(before, after)) {
-                        toReturn = after;
-                    }
+                if (!valueAndRowKindEquals(before, after)) {
+                    toReturn = after.level() == AFTER_LEVEL ? after : before;
                 }
             } else {
                 throw new IllegalArgumentException("Illegal kv number: " + 
kvs.size());
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index e80e45b1f8..93ef7fbe4d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -1204,6 +1204,32 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
                         Row.of("+I", 2, "B"), Row.of("-D", 2, "B"), 
Row.of("+I", 3, "C"));
     }
 
+    @Test
+    public void testIncrementScanModeWithInsertOverwrite() throws Exception {
+
+        sql("CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED, v 
STRING)");
+
+        // snapshot 1
+        sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'A'), (1, 'B'), (1, 
'C')");
+        // snapshot 2
+        sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'C'), (1, 'D')");
+
+        List<Row> result =
+                sql(
+                        "SELECT * FROM `test_scan_mode$audit_log` "
+                                + "/*+ 
OPTIONS('incremental-between'='1,2','incremental-between-scan-mode'='diff') 
*/");
+        assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 1, "D"));
+
+        // snapshot 3
+        sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'D')");
+
+        result =
+                sql(
+                        "SELECT * FROM `test_scan_mode$audit_log` "
+                                + "/*+ 
OPTIONS('incremental-between'='2,2','incremental-between-scan-mode'='diff') 
*/");
+        assertThat(result).isEmpty();
+    }
+
     @Test
     public void testAuditLogTableWithComputedColumn() throws Exception {
         sql("CREATE TABLE test_table (a int, b int, c AS a + b);");

Reply via email to