This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f36a18cc1 [fix](paimon) Align incremental query behavior with Spark 
Paimon for single snapshot scenario (#58239)
a5f36a18cc1 is described below

commit a5f36a18cc1ecd7a745e7bade177728330921841
Author: Socrates <[email protected]>
AuthorDate: Sat Nov 22 16:24:43 2025 +0800

    [fix](paimon) Align incremental query behavior with Spark Paimon for single 
snapshot scenario (#58239)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    When a Paimon table has only 1 snapshot, users cannot perform
    incremental queries. The validation logic in Doris has two issues:
    
    1. It rejects queries where `startSnapshotId = endSnapshotId`:
    ```sql
    SELECT * FROM tb_simple@incr('startSnapshotId'='1', 'endSnapshotId'='1');
    -- Error: startSnapshotId must be less than endSnapshotId
    ```
    
    2. It rejects queries where `startSnapshotId = 0` (which is needed to
    query all data from a single snapshot):
    ```sql
    SELECT * FROM tb_simple@incr('startSnapshotId'='0', 'endSnapshotId'='1');
    -- Error: startSnapshotId must be greater than 0
    ```
    
    This behavior is inconsistent with Spark Paimon, which:
    - Allows `startSnapshotId = endSnapshotId` (returns empty result)
    - Allows `startSnapshotId = 0` to query all data from the initial state
    to the specified snapshot
    
    ## Solution
    
    Align Doris incremental query behavior with Spark Paimon:
    
    1. **Allow `startSnapshotId = 0`**: This enables querying all data from
    a single snapshot by using `startSnapshotId=0, endSnapshotId=1`
    2. **Allow `startSnapshotId = endSnapshotId`**: This matches Spark
    Paimon behavior (returns empty result when querying the same snapshot)
    3. **Update validation**: Allow `startSnapshotId >= 0` and
    `endSnapshotId >= 0` (previously `> 0`)
---
 .../datasource/paimon/source/PaimonScanNode.java   | 12 ++---
 .../paimon/source/PaimonScanNodeTest.java          | 52 +++++++++-------------
 .../external_table_p0/paimon/paimon_incr_read.out  | 14 ++++++
 .../paimon/paimon_incr_read.groovy                 | 13 +++---
 4 files changed, 49 insertions(+), 42 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 092eb8ca0ff..402bf3d0ef6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -553,8 +553,8 @@ public class PaimonScanNode extends FileQueryScanNode {
             if (hasStartSnapshotId) {
                 try {
                     long startSId = 
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
-                    if (startSId <= 0) {
-                        throw new UserException("startSnapshotId must be 
greater than 0");
+                    if (startSId < 0) {
+                        throw new UserException("startSnapshotId must be 
greater than or equal to 0");
                     }
                 } catch (NumberFormatException e) {
                     throw new UserException("Invalid startSnapshotId format: " 
+ e.getMessage());
@@ -564,8 +564,8 @@ public class PaimonScanNode extends FileQueryScanNode {
             if (hasEndSnapshotId) {
                 try {
                     long endSId = 
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
-                    if (endSId <= 0) {
-                        throw new UserException("endSnapshotId must be greater 
than 0");
+                    if (endSId < 0) {
+                        throw new UserException("endSnapshotId must be greater 
than or equal to 0");
                     }
                 } catch (NumberFormatException e) {
                     throw new UserException("Invalid endSnapshotId format: " + 
e.getMessage());
@@ -577,8 +577,8 @@ public class PaimonScanNode extends FileQueryScanNode {
                 try {
                     long startSId = 
Long.parseLong(params.get(DORIS_START_SNAPSHOT_ID));
                     long endSId = 
Long.parseLong(params.get(DORIS_END_SNAPSHOT_ID));
-                    if (startSId >= endSId) {
-                        throw new UserException("startSnapshotId must be less 
than endSnapshotId");
+                    if (startSId > endSId) {
+                        throw new UserException("startSnapshotId must be less 
than or equal to endSnapshotId");
                     }
                 } catch (NumberFormatException e) {
                     throw new UserException("Invalid snapshot ID format: " + 
e.getMessage());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index da8023b2b41..93afa390530 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -126,14 +126,14 @@ public class PaimonScanNodeTest {
         // Test valid parameter combinations
 
         // 1. Only startSnapshotId
-        Map<String, String> params = new HashMap<>();
-        params.put("startSnapshotId", "5");
+        Map<String, String> params1 = new HashMap<>();
+        params1.put("startSnapshotId", "5");
         ExceptionChecker.expectThrowsWithMsg(UserException.class,
                 "endSnapshotId is required when using snapshot-based 
incremental read",
-                () -> PaimonScanNode.validateIncrementalReadParams(params));
+                () -> PaimonScanNode.validateIncrementalReadParams(params1));
 
         // 2. Both startSnapshotId and endSnapshotId
-        params.clear();
+        Map<String, String> params = new HashMap<>();
         params.put("startSnapshotId", "1");
         params.put("endSnapshotId", "5");
         Map<String, String> result = 
PaimonScanNode.validateIncrementalReadParams(params);
@@ -226,57 +226,47 @@ public class PaimonScanNodeTest {
                     e.getMessage().contains("startSnapshotId is required when 
using snapshot-based incremental read"));
         }
 
-        // 11. Test invalid snapshot ID values (≤ 0)
-        params.clear();
-        params.put("startSnapshotId", "0");
-        try {
-            PaimonScanNode.validateIncrementalReadParams(params);
-            Assert.fail("Should throw exception for startSnapshotId ≤ 0");
-        } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
greater than 0"));
-        }
-
+        // 11. Test invalid snapshot ID values < 0)
         params.clear();
         params.put("startSnapshotId", "-1");
         try {
             PaimonScanNode.validateIncrementalReadParams(params);
-            Assert.fail("Should throw exception for negative startSnapshotId");
+            Assert.fail("Should throw exception for startSnapshotId < 0");
         } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
greater than 0"));
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
greater than or equal to 0"));
         }
 
         params.clear();
         params.put("startSnapshotId", "1");
-        params.put("endSnapshotId", "0");
+        params.put("endSnapshotId", "-1");
         try {
             PaimonScanNode.validateIncrementalReadParams(params);
-            Assert.fail("Should throw exception for endSnapshotId ≤ 0");
+            Assert.fail("Should throw exception for endSnapshotId < 0");
         } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("endSnapshotId must be 
greater than 0"));
+            Assert.assertTrue(e.getMessage().contains("endSnapshotId must be 
greater than or equal to 0"));
         }
 
-        // 12. Test start ≥ end for snapshot IDs
+        // 12. Test start > end for snapshot IDs
         params.clear();
-        params.put("startSnapshotId", "5");
+        params.put("startSnapshotId", "6");
         params.put("endSnapshotId", "5");
         try {
             PaimonScanNode.validateIncrementalReadParams(params);
-            Assert.fail("Should throw exception when startSnapshotId = 
endSnapshotId");
+            Assert.fail("Should throw exception when startSnapshotId > 
endSnapshotId");
         } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
less than endSnapshotId"));
+            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
less than or equal to endSnapshotId"));
         }
 
+        // 12.1. Test startSnapshotId == endSnapshotId (should be allowed, 
consistent with Spark Paimon behavior)
         params.clear();
-        params.put("startSnapshotId", "6");
+        params.put("startSnapshotId", "5");
         params.put("endSnapshotId", "5");
-        try {
-            PaimonScanNode.validateIncrementalReadParams(params);
-            Assert.fail("Should throw exception when startSnapshotId > 
endSnapshotId");
-        } catch (UserException e) {
-            Assert.assertTrue(e.getMessage().contains("startSnapshotId must be 
less than endSnapshotId"));
-        }
+        result = PaimonScanNode.validateIncrementalReadParams(params);
+        Assert.assertEquals("5,5", result.get("incremental-between"));
+        Assert.assertTrue(result.containsKey("scan.mode") && 
result.get("scan.mode") == null);
+        Assert.assertEquals(3, result.size());
 
-        // 13. Test invalid timestamp values (≤ 0)
+        // 13. Test invalid timestamp values (< 0)
         params.clear();
         params.put("startTimestamp", "-1");
         try {
diff --git a/regression-test/data/external_table_p0/paimon/paimon_incr_read.out 
b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
index 6d776ec8df8..ad49163cc36 100644
--- a/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
+++ b/regression-test/data/external_table_p0/paimon/paimon_incr_read.out
@@ -32,6 +32,13 @@
 
 -- !scan_mode4 --
 
+-- !snapshot_id_0_0_empty --
+
+-- !snapshot_id_0_1 --
+1      Alice   30
+
+-- !snapshot_id_1_1_empty --
+
 -- !cte --
 Bob    25
 Charlie        28
@@ -73,6 +80,13 @@ Alice        30
 
 -- !scan_mode4 --
 
+-- !snapshot_id_0_0_empty --
+
+-- !snapshot_id_0_1 --
+1      Alice   30
+
+-- !snapshot_id_1_1_empty --
+
 -- !cte --
 Bob    25
 Charlie        28
diff --git 
a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy 
b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
index 08a271e054b..3f5b2accf10 100644
--- a/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
+++ b/regression-test/suites/external_table_p0/paimon/paimon_incr_read.groovy
@@ -54,7 +54,10 @@ suite("test_paimon_incr_read", 
"p0,external,doris,external_docker,external_docke
             order_qt_scan_mode2 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'diff');"""
             order_qt_scan_mode3 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'delta');"""
             order_qt_scan_mode4 """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=2, 
'incrementalBetweenScanMode' = 'changelog');"""
-            
+
+            order_qt_snapshot_id_0_0_empty """select * from 
paimon_incr@incr('startSnapshotId'=0, 'endSnapshotId'=0)"""
+            order_qt_snapshot_id_0_1 """select * from 
paimon_incr@incr('startSnapshotId'=0, 'endSnapshotId'=1)"""
+            order_qt_snapshot_id_1_1_empty """select * from 
paimon_incr@incr('startSnapshotId'=1, 'endSnapshotId'=1)"""
 
             // complex query
             qt_cte """with cte1 as (select * from 
paimon_incr@incr('startTimestamp'=0)) select name, age from cte1 order by 
age;"""
@@ -84,10 +87,6 @@ suite("test_paimon_incr_read", 
"p0,external,doris,external_docker,external_docke
                 sql """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=2, 'incrementalBetweenScanMode' = 'error');"""
                 exception "incrementalBetweenScanMode must be one of"
             }
-            test {
-                sql """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=1)"""
-                exception "startSnapshotId must be less than endSnapshotId"
-            }
             test {
                 sql """select * from paimon_incr@incr('startSnapshotId'=1)"""
                 exception "endSnapshotId is required when using snapshot-based 
incremental read"
@@ -96,6 +95,10 @@ suite("test_paimon_incr_read", 
"p0,external,doris,external_docker,external_docke
                 sql """select * from paimon_incr@incr('startSnapshotId'=1, 
'endSnapshotId'=2) for version as of 1"""
                 exception "should not spec both snapshot and scan params"
             }
+            test {
+                sql """select * from paimon_incr@incr('startSnapshotId'=-1)"""
+                exception "startSnapshotId must be greater than or equal to 0"
+            }
         }
 
         test_incr_read("false")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to