This is an automated email from the ASF dual-hosted git repository.
Aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b965687700 [core] Fix wrong merge order of increment diff split read
(#7033)
b965687700 is described below
commit b96568770046ae83a3177dbbeccfd66d6e0823da
Author: LiangDai-Mars <[email protected]>
AuthorDate: Tue May 19 19:20:51 2026 +0800
[core] Fix wrong merge order of increment diff split read (#7033)
---
.../source/splitread/IncrementalDiffSplitRead.java | 6 ++---
.../apache/paimon/flink/BatchFileStoreITCase.java | 26 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
index 1e93f148b7..c5032158fa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java
@@ -206,10 +206,8 @@ public class IncrementalDiffSplitRead implements
SplitRead<InternalRow> {
} else if (kvs.size() == 2) {
KeyValue before = kvs.get(0);
KeyValue after = kvs.get(1);
- if (after.level() == AFTER_LEVEL) {
- if (!valueAndRowKindEquals(before, after)) {
- toReturn = after;
- }
+ if (!valueAndRowKindEquals(before, after)) {
+ toReturn = after.level() == AFTER_LEVEL ? after : before;
}
} else {
throw new IllegalArgumentException("Illegal kv number: " +
kvs.size());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index e80e45b1f8..93ef7fbe4d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -1204,6 +1204,32 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
Row.of("+I", 2, "B"), Row.of("-D", 2, "B"),
Row.of("+I", 3, "C"));
}
+ @Test
+ public void testIncrementScanModeWithInsertOverwrite() throws Exception {
+
+ sql("CREATE TABLE test_scan_mode (id INT PRIMARY KEY NOT ENFORCED, v
STRING)");
+
+ // snapshot 1
+ sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'A'), (1, 'B'), (1,
'C')");
+ // snapshot 2
+ sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'C'), (1, 'D')");
+
+ List<Row> result =
+ sql(
+ "SELECT * FROM `test_scan_mode$audit_log` "
+ + "/*+
OPTIONS('incremental-between'='1,2','incremental-between-scan-mode'='diff')
*/");
+ assertThat(result).containsExactlyInAnyOrder(Row.of("+I", 1, "D"));
+
+ // snapshot 3
+ sql("INSERT OVERWRITE test_scan_mode VALUES (1, 'D')");
+
+ result =
+ sql(
+ "SELECT * FROM `test_scan_mode$audit_log` "
+ + "/*+
OPTIONS('incremental-between'='2,2','incremental-between-scan-mode'='diff')
*/");
+ assertThat(result).isEmpty();
+ }
+
@Test
public void testAuditLogTableWithComputedColumn() throws Exception {
sql("CREATE TABLE test_table (a int, b int, c AS a + b);");