This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new cccf7c1a628 branch-3.1: [fix](paimon) Align incremental query behavior
with Spark Paimon for single snapshot scenario #58239 (#58253)
cccf7c1a628 is described below
commit cccf7c1a62846abccaa4b1abb3a00264d1f6b49f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 25 14:18:57 2025 +0800
branch-3.1: [fix](paimon) Align incremental query behavior with Spark
Paimon for single snapshot scenario #58239 (#58253)
Cherry-picked from #58239
Co-authored-by: Socrates <[email protected]>
---
.../datasource/paimon/source/PaimonScanNode.java | 12 ++---
.../paimon/source/PaimonScanNodeTest.java | 52 +++++++++-------------
.../external_table_p0/paimon/paimon_incr_read.out | 14 ++++++
.../paimon/paimon_incr_read.groovy | 13 +++---
4 files changed, 49 insertions(+), 42 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 673d2499e7a..8cabcb0cfcc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -553,8 +553,8 @@ public class PaimonScanNode extends FileQueryScanNode {
if (hasStartSnapshotId) {
try {
long startSId =
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
- if (startSId <= 0) {
- throw new UserException("startSnapshotId must be
greater than 0");
+ if (startSId < 0) {
+ throw new UserException("startSnapshotId must be
greater than or equal to 0");
}
} catch (NumberFormatException e) {
throw new UserException("Invalid startSnapshotId format: "
+ e.getMessage());
@@ -564,8 +564,8 @@ public class PaimonScanNode extends FileQueryScanNode {
if (hasEndSnapshotId) {
try {
long endSId =
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
- if (endSId <= 0) {
- throw new UserException("endSnapshotId must be greater
than 0");
+ if (endSId < 0) {
+ throw new UserException("endSnapshotId must be greater
than or equal to 0");
}
} catch (NumberFormatException e) {
throw new UserException("Invalid endSnapshotId format: " +
e.getMessage());
@@ -577,8 +577,8 @@ public class PaimonScanNode extends FileQueryScanNode {
try {
long startSId =
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
long endSId =
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
- if (startSId >= endSId) {
- throw new UserException("startSnapshotId must be less
than endSnapshotId");
+ if (startSId > endSId) {
+ throw new UserException("startSnapshotId must be less
than or equal to endSnapshotId");
}
} catch (NumberFormatException e) {
throw new UserException("Invalid snapshot ID format: " +
e.getMessage());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index da8023b2b41..93afa390530 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -126,14 +126,14 @@ public class PaimonScanNodeTest {
// Test valid parameter combinations
// 1. Only startSnapshotId
- Map<String, String> params = new HashMap<>();
- params.put("startSnapshotId", "5");
+ Map<String, String> params1 = new HashMap<>();
+ params1.put("startSnapshotId", "5");
ExceptionChecker.expectThrowsWithMsg(UserException.class,
"endSnapshotId is required when using snapshot-based
incremental read",
- () -> PaimonScanNode.validateIncrementalReadParams(params));
+ () -> PaimonScanNode.validateIncrementalReadParams(params1));
// 2. Both startSnapshotId and endSnapshotId
- params.clear();
+ Map<String, String> params = new HashMap<>();
params.put("startSnapshotId", "1");
params.put("endSnapshotId", "5");
Map<String, String> result =
PaimonScanNode.validateIncrementalReadParams(params);
@@ -226,57 +226,47 @@ public class PaimonScanNodeTest {
e.getMessage().contains("startSnapshotId is required when
using snapshot-based incremental read"));
}
- // 11. Test invalid snapshot ID values (≤ 0)
- params.clear();
- params.put("startSnapshotId", "0");
- try {
- PaimonScanNode.validateIncrementalReadParams(params);
- Assert.fail("Should throw exception for startSnapshotId ≤ 0");
- } catch (UserException e) {
- Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
greater than 0"));
- }
-
+ // 11. Test invalid snapshot ID values < 0)
params.clear();
params.put("startSnapshotId", "-1");
try {
PaimonScanNode.validateIncrementalReadParams(params);
- Assert.fail("Should throw exception for negative startSnapshotId");
+ Assert.fail("Should throw exception for startSnapshotId < 0");
} catch (UserException e) {
- Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
greater than 0"));
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
greater than or equal to 0"));
}
params.clear();
params.put("startSnapshotId", "1");
- params.put("endSnapshotId", "0");
+ params.put("endSnapshotId", "-1");
try {
PaimonScanNode.validateIncrementalReadParams(params);
- Assert.fail("Should throw exception for endSnapshotId ≤ 0");
+ Assert.fail("Should throw exception for endSnapshotId < 0");
} catch (UserException e) {
- Assert.assertTrue(e.getMessage().contains("endSnapshotId must be
greater than 0"));
+ Assert.assertTrue(e.getMessage().contains("endSnapshotId must be
greater than or equal to 0"));
}
- // 12. Test start ≥ end for snapshot IDs
+ // 12. Test start > end for snapshot IDs
params.clear();
- params.put("startSnapshotId", "5");
+ params.put("startSnapshotId", "6");
params.put("endSnapshotId", "5");
try {
PaimonScanNode.validateIncrementalReadParams(params);
- Assert.fail("Should throw exception when startSnapshotId =
endSnapshotId");
+ Assert.fail("Should throw exception when startSnapshotId >
endSnapshotId");
} catch (UserException e) {
- Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
less than endSnapshotId"));
+ Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
less than or equal to endSnapshotId"));
}
+ // 12.1. Test startSnapshotId == endSnapshotId (should be allowed,
consistent with Spark Paimon behavior)
params.clear();
- params.put("startSnapshotId", "6");
+ params.put("startSnapshotId", "5");
params.put("endSnapshotId", "5");
- try {
- PaimonScanNode.validateIncrementalReadParams(params);
- Assert.fail("Should throw exception when startSnapshotId >
endSnapshotId");
- } catch (UserException e) {
- Assert.assertTrue(e.getMessage().contains("startSnapshotId must be
less than endSnapshotId"));
- }
+ result = PaimonScanNode.validateIncrementalReadParams(params);
+ Assert.assertEquals("5,5", result.get("incremental-between"));
+ Assert.assertTrue(result.containsKey("scan.mode") &&
result.get("scan.mode") == null);
+ Assert.assertEquals(3, result.size());
- // 13. Test invalid timestamp values (≤ 0)
+ // 13. Test invalid timestamp values (< 0)
params.clear();
params.put("startTimestamp", "-1");
try {
diff --git a/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
index 6d776ec8df8..ad49163cc36 100644
--- a/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
+++ b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
@@ -32,6 +32,13 @@
-- !scan_mode4 --
+-- !snapshot_id_0_0_empty --
+
+-- !snapshot_id_0_1 --
+1 Alice 30
+
+-- !snapshot_id_1_1_empty --
+
-- !cte --
Bob 25
Charlie 28
@@ -73,6 +80,13 @@ Alice 30
-- !scan_mode4 --
+-- !snapshot_id_0_0_empty --
+
+-- !snapshot_id_0_1 --
+1 Alice 30
+
+-- !snapshot_id_1_1_empty --
+
-- !cte --
Bob 25
Charlie 28
diff --git
a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
index 08a271e054b..3f5b2accf10 100644
--- a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
+++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
@@ -54,7 +54,10 @@ suite("test_paimon_incr_read",
"p0,external,doris,external_docker,external_docke
order_qt_scan_mode2 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'diff');"""
order_qt_scan_mode3 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'delta');"""
order_qt_scan_mode4 """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2,
'incrementalBetweenScanMode' = 'changelog');"""
-
+
+ order_qt_snapshot_id_0_0_empty """select * from
paimon_incr@incr('startSnapshotId'=0, 'endSnapshotId'=0)"""
+ order_qt_snapshot_id_0_1 """select * from
paimon_incr@incr('startSnapshotId'=0, 'endSnapshotId'=1)"""
+ order_qt_snapshot_id_1_1_empty """select * from
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=1)"""
// complex query
qt_cte """with cte1 as (select * from
paimon_incr@incr('startTimestamp'=0)) select name, age from cte1 order by
age;"""
@@ -84,10 +87,6 @@ suite("test_paimon_incr_read",
"p0,external,doris,external_docker,external_docke
sql """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'error');"""
exception "incrementalBetweenScanMode must be one of"
}
- test {
- sql """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=1)"""
- exception "startSnapshotId must be less than endSnapshotId"
- }
test {
sql """select * from paimon_incr@incr('startSnapshotId'=1)"""
exception "endSnapshotId is required when using snapshot-based
incremental read"
@@ -96,6 +95,10 @@ suite("test_paimon_incr_read",
"p0,external,doris,external_docker,external_docke
sql """select * from paimon_incr@incr('startSnapshotId'=1,
'endSnapshotId'=2) for version as of 1"""
exception "should not spec both snapshot and scan params"
}
+ test {
+ sql """select * from paimon_incr@incr('startSnapshotId'=-1)"""
+ exception "startSnapshotId must be greater than or equal to 0"
+ }
}
test_incr_read("false")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]