This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0391abffb [core] Introduce incremental-between read (#1437)
0391abffb is described below

commit 0391abffb068c39171491a472845e7a18bd44116
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 27 15:17:55 2023 +0800

    [core] Introduce incremental-between read (#1437)
---
 docs/content/how-to/cdc-ingestion.md               |   2 +-
 docs/content/how-to/lookup-joins.md                |   2 +-
 docs/content/how-to/querying-tables.md             | 315 ++++++++-------------
 .../{querying-tables.md => system-tables.md}       | 164 +----------
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  36 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |  26 +-
 .../table/source/AbstractInnerTableScan.java       |   8 +
 .../snapshot/IncrementalStartingScanner.java       |  79 ++++++
 .../table/source/snapshot/SnapshotReader.java      |   6 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  10 +
 .../apache/paimon/table/system/AuditLogTable.java  |  11 +
 .../apache/paimon/table/IncrementalTableTest.java  | 150 ++++++++++
 .../org/apache/paimon/table/TableTestBase.java     | 115 ++++++++
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   2 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   2 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   2 +-
 .../org/apache/paimon/flink/FlinkCatalogTest.java  |   6 +
 18 files changed, 581 insertions(+), 363 deletions(-)

diff --git a/docs/content/how-to/cdc-ingestion.md 
b/docs/content/how-to/cdc-ingestion.md
index a9c76b37d..3533d9906 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -1,6 +1,6 @@
 ---
 title: "CDC Ingestion"
-weight: 7
+weight: 8
 type: docs
 aliases:
 - /how-to/cdc-ingestion.html
diff --git a/docs/content/how-to/lookup-joins.md 
b/docs/content/how-to/lookup-joins.md
index a2ea277c8..f7253e0db 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -1,6 +1,6 @@
 ---
 title: "Lookup Joins"
-weight: 6
+weight: 7
 type: docs
 aliases:
 - /how-to/lookup-joins.html
diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 26535316b..83a9bb5cc 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -28,104 +28,40 @@ under the License.
 
 Just like all other tables, Paimon tables can be queried with `SELECT` 
statement.
 
-## Scan Mode
-
-By specifying the `scan.mode` table property, users can specify where and how 
Paimon sources should produce records.
-
-<table class="table table-bordered">
-<thead>
-<tr>
-<th>Scan Mode</th>
-<th>Batch Source Behavior</th>
-<th>Streaming Source Behavior</th>
-</tr>
-</thead>
-<tbody>
-<tr>
-<td>default</td>
-<td colspan="2">
-The default scan mode. Determines actual scan mode according to other table 
properties. If "scan.timestamp-millis" is set the actual scan mode will be 
"from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will 
be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
-</td>
-</tr>
-<tr>
-<td>latest-full</td>
-<td>
-Produces the latest snapshot of table.
-</td>
-<td>
-Produces the latest snapshot on the table upon first startup, and continues to 
read the following changes.
-</td>
-</tr>
-<tr>
-<td>compacted-full</td>
-<td>
-Produces the snapshot after the latest <a href="{{< ref 
"concepts/file-layouts#compaction" >}}">compaction</a>.
-</td>
-<td>
-Produces the snapshot after the latest compaction on the table upon first 
startup, and continues to read the following changes.
-</td>
-</tr>
-<tr>
-<td>latest</td>
-<td>Same as "latest-full"</td>
-<td>Continuously reads latest changes without producing a snapshot at the 
beginning.</td>
-</tr>
-<tr>
-<td>from-timestamp</td>
-<td>Produces a snapshot earlier than or equals to the timestamp specified by 
"scan.timestamp-millis".</td>
-<td>Continuously reads changes starting from timestamp specified by 
"scan.timestamp-millis", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Continuously reads changes starting from a snapshot specified by 
"scan.snapshot-id", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot-full</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Produces from snapshot specified by "scan.snapshot-id" on the table upon 
first startup, and continuously reads changes.</td>
-</tr>
-</tbody>
-</table>
-
-Users can also adjust `changelog-producer` table property to specify the 
pattern of produced changes. See [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}) for details.
-
-{{< img src="/img/scan-mode.png">}}
-
-## Time Travel
-
-Currently, Paimon supports time travel for Flink and Spark 3 (requires Spark 
3.3+).
+## Batch Query
+
+Paimon's batch read returns all the data in a snapshot of the table. By 
default, batch reads return the latest snapshot.
+
+### Batch Time Travel
+
+Paimon batch reads with time travel can specify a snapshot and read the 
corresponding data.
 
 {{< tabs "time-travel-example" >}}
 
 {{< tab "Flink" >}}
-****
-you can use [dynamic table 
options](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options)
 to specify scan mode and from where to start:
-
 ```sql
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot' by default
+-- read the snapshot with id 1L
 SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
 
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot-full'
-SELECT * FROM t /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
-
--- travel to specified timestamp with a long value in milliseconds
+-- read the snapshot from specified timestamp in unix milliseconds
 SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
 ```
 {{< /tab >}}
 
 {{< tab "Spark3" >}}
 
+Requires Spark 3.3+.
+
 you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
 
 ```sql
--- travel to snapshot with id 1L (use snapshot id as version)
+-- read the snapshot with id 1L (use snapshot id as version)
 SELECT * FROM t VERSION AS OF 1;
 
--- travel to specified timestamp 
+-- read the snapshot from specified timestamp 
 SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
 
--- you can also use a long value in seconds as timestamp
+-- read the snapshot from specified timestamp in unix seconds
 SELECT * FROM t TIMESTAMP AS OF 1678883047;
 ```
 
@@ -134,7 +70,7 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
 {{< tab "Trino" >}}
 
 ```sql
--- travel to specified timestamp with a long value in milliseconds
+-- read the snapshot from specified timestamp with a long value in unix 
milliseconds
 SET SESSION paimon.scan_timestamp_millis=1679486589444;
 SELECT * FROM t;
 ```
@@ -143,7 +79,78 @@ SELECT * FROM t;
 
 {{< /tabs >}}
 
-## Consumer ID
+### Batch Incremental
+
+Read incremental changes between start snapshot (exclusive) and end snapshot.
+
+For example, '5,10' means changes between snapshot 5 and snapshot 10.
+
+{{< tabs "incremental-example" >}}
+
+{{< tab "Flink" >}}
+```sql
+SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
+```
+{{< /tab >}}
+
+{{< tab "Spark" >}}
+
+```java
+spark.read()
+  .format("paimon")
+  .option("incremental-between", "12,20")
+  .load("path/to/table")
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Streaming Query
+
+By default, Streaming read produces the latest snapshot on the table upon 
first startup,
+and continue to read the latest changes.
+
+Paimon by default ensures that your startup is properly processed with the 
full amount
+included.
+
+You can also do streaming read without the snapshot data, you can use `latest` 
scan mode:
+
+{{< tabs "latest streaming read" >}}
+{{< tab "Flink" >}}
+```sql
+-- Continuously reads latest changes without producing a snapshot at the 
beginning.
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Streaming Time Travel
+
+If you only want to process data for today and beyond, you can do so with 
partitioned filters:
+
+```sql
+SELECT * FROM t WHERE dt > '2023-06-26'
+```
+
+If it's not a partitioned table, or you can't filter by partition, you can use 
Time travel's stream read.
+
+{{< tabs "streaming-time-travel" >}}
+{{< tab "Flink" >}}
+```sql
+-- read changes from snapshot id 1L 
+SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
+
+-- read changes from snapshot specified timestamp
+SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
+
+-- read snapshot id 1L upon first startup, and continue to read the changes
+SELECT * FROM t /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Consumer ID
 
 {{< hint info >}}
 This is an experimental feature.
@@ -168,136 +175,56 @@ NOTE: The consumer will prevent expiration of the 
snapshot. You can specify 'con
 lifetime of consumers.
 {{< /hint >}}
 
-## System Tables
+## Query Optimization
 
-System tables contain metadata and information about each table, such as the 
snapshots created and the options in use. Users can access system tables with 
batch queries.
+{{< label Batch >}}{{< label Streaming >}}
 
-Currently, Flink, Spark and Trino supports querying system tables.
+It is highly recommended to specify partition and primary key filters
+along with the query, which will speed up the data skipping of the query.
 
-In some cases, the table name needs to be enclosed with back quotes to avoid 
syntax parsing conflicts, for example triple access mode:
-```sql
-SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
-```
-
-### Snapshots Table
-
-You can query the snapshot history information of the table through snapshots 
table, including the record count occurred in the snapshot.
-
-```sql
-SELECT * FROM MyTable$snapshots;
-
-/*
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+
-|  snapshot_id |  schema_id |     commit_user | commit_identifier |  
commit_kind |             commit_time |  total_record_count |  
delta_record_count |  changelog_record_count |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+
-|            2 |          0 | 7ca4cd28-98e... |                 2 |       
APPEND | 2022-10-26 11:44:15.600 |                   2 |                   2 |  
                     0 |
-|            1 |          0 | 870062aa-3e9... |                 1 |       
APPEND | 2022-10-26 11:44:15.148 |                   1 |                   1 |  
                     0 |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+
-2 rows in set
-*/
-```
+The filter functions that can accelerate data skipping are:
+- `=`
+- `<`
+- `<=`
+- `>`
+- `>=`
+- `IN (...)`
+- `LIKE 'abc%'`
+- `IS NULL`
 
-By querying the snapshots table, you can know the commit and expiration 
information about that table and time travel through the data.
+Paimon will sort the data by primary key, which speeds up the point queries
+and range queries. When using a composite primary key, it is best for the query
+filters to form a [leftmost 
prefix](https://dev.mysql.com/doc/refman/5.7/en/multiple-column-indexes.html)
+of the primary key for good acceleration.
 
-### Schemas Table
-
-You can query the historical schemas of the table through schemas table.
+Suppose that a table has the following specification:
 
 ```sql
-SELECT * FROM MyTable$schemas;
-
-/*
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-| schema_id |                         fields | partition_keys | primary_keys | 
options | comment |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-|         0 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         |
-|         1 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         |
-|         2 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-3 rows in set
-*/
+CREATE TABLE orders (
+    catalog_id BIGINT,
+    order_id BIGINT,
+    .....,
+    PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
+)
 ```
 
-You can join the snapshots table and schemas table to get the fields of given 
snapshots.
+The query obtains a good acceleration by specifying a range filter for
+the leftmost prefix of the primary key.
 
 ```sql
-SELECT s.snapshot_id, t.schema_id, t.fields 
-    FROM MyTable$snapshots s JOIN MyTable$schemas t 
-    ON s.schema_id=t.schema_id where s.snapshot_id=100;
-```
+SELECT * FROM orders WHERE catalog_id=1025;
 
-### Options Table
+SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
 
-You can query the table's option information which is specified from the DDL 
through options table. The options not shown will be the default value. You can 
take reference to  [Configuration].
-
-```sql
-SELECT * FROM MyTable$options;
-
-/*
-+------------------------+--------------------+
-|         key            |        value       |
-+------------------------+--------------------+
-| snapshot.time-retained |         5 h        |
-+------------------------+--------------------+
-1 rows in set
-*/
+SELECT * FROM orders
+  WHERE catalog_id=1025
+  AND order_id>2035 AND order_id<6000;
 ```
 
-### Audit log Table
-
-If you need to audit the changelog of the table, you can use the `audit_log` 
system table. Through `audit_log` table, you can get the `rowkind` column when 
you get the incremental data of the table. You can use this column for
-filtering and other operations to complete the audit.
-
-There are four values for `rowkind`:
-
-- `+I`: Insertion operation.
-- `-U`: Update operation with the previous content of the updated row.
-- `+U`: Update operation with new content of the updated row.
-- `-D`: Deletion operation.
+However, the following filter cannot accelerate the query well.
 
 ```sql
-SELECT * FROM MyTable$audit_log;
-
-/*
-+------------------+-----------------+-----------------+
-|     rowkind      |     column_0    |     column_1    |
-+------------------+-----------------+-----------------+
-|        +I        |      ...        |      ...        |
-+------------------+-----------------+-----------------+
-|        -U        |      ...        |      ...        |
-+------------------+-----------------+-----------------+
-|        +U        |      ...        |      ...        |
-+------------------+-----------------+-----------------+
-3 rows in set
-*/
-```
+SELECT * FROM orders WHERE order_id=29495;
 
-### Files Table
-You can query the files of the table with specific snapshot.
-
-```
--- Query the files of latest snapshot
-SELECT * FROM MyTable$files;
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-| partition | bucket |                      file_path | file_format | 
schema_id | level | record_count | file_size_in_bytes | min_key | max_key |     
 null_value_counts |         min_value_stats |         max_value_stats |        
 creation_time |
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-|       [3] |      0 | data-8f64af95-29cc-4342-adc... |         orc |         
0 |     0 |            1 |                593 |     [c] |     [c] | {cnt=0, 
val=0, word=0} | {cnt=3, val=33, word=c} | {cnt=3, val=33, word=c} 
|2023-02-24T16:06:21.166|
-|       [2] |      0 | data-8b369068-0d37-4011-aa5... |         orc |         
0 |     0 |            1 |                593 |     [b] |     [b] | {cnt=0, 
val=0, word=0} | {cnt=2, val=22, word=b} | {cnt=2, val=22, word=b} 
|2023-02-24T16:06:21.166|
-|       [2] |      0 | data-83aa7973-060b-40b6-8c8... |         orc |         
0 |     0 |            1 |                605 |     [d] |     [d] | {cnt=0, 
val=0, word=0} | {cnt=2, val=32, word=d} | {cnt=2, val=32, word=d} 
|2023-02-24T16:06:21.166|
-|       [5] |      0 | data-3d304f4a-bcea-44dc-a13... |         orc |         
0 |     0 |            1 |                593 |     [c] |     [c] | {cnt=0, 
val=0, word=0} | {cnt=5, val=51, word=c} | {cnt=5, val=51, word=c} 
|2023-02-24T16:06:21.166|
-|       [1] |      0 | data-10abb5bc-0170-43ae-b6a... |         orc |         
0 |     0 |            1 |                595 |     [a] |     [a] | {cnt=0, 
val=0, word=0} | {cnt=1, val=11, word=a} | {cnt=1, val=11, word=a} 
|2023-02-24T16:06:21.166|
-|       [4] |      0 | data-2c9b7095-65b7-4013-a7a... |         orc |         
0 |     0 |            1 |                593 |     [a] |     [a] | {cnt=0, 
val=0, word=0} | {cnt=4, val=12, word=a} | {cnt=4, val=12, word=a} 
|2023-02-24T16:06:21.166|
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-6 rows in set
-
--- You can also query the files with specific snapshot
-SELECT * FROM MyTable$files /*+ OPTIONS('scan.snapshot-id'='1') */;
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-| partition | bucket |                      file_path | file_format | 
schema_id | level | record_count | file_size_in_bytes | min_key | max_key |     
 null_value_counts |         min_value_stats |         max_value_stats |        
 creation_time |
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-|       [3] |      0 | data-8f64af95-29cc-4342-adc... |         orc |         
0 |     0 |            1 |                593 |     [c] |     [c] | {cnt=0, 
val=0, word=0} | {cnt=3, val=33, word=c} | {cnt=3, val=33, word=c} 
|2023-02-24T16:06:21.166|
-|       [2] |      0 | data-8b369068-0d37-4011-aa5... |         orc |         
0 |     0 |            1 |                593 |     [b] |     [b] | {cnt=0, 
val=0, word=0} | {cnt=2, val=22, word=b} | {cnt=2, val=22, word=b} 
|2023-02-24T16:06:21.166|
-|       [1] |      0 | data-10abb5bc-0170-43ae-b6a... |         orc |         
0 |     0 |            1 |                595 |     [a] |     [a] | {cnt=0, 
val=0, word=0} | {cnt=1, val=11, word=a} | {cnt=1, val=11, word=a} 
|2023-02-24T16:06:21.166|
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-3 rows in set
+SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
 ```
\ No newline at end of file
diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/system-tables.md
similarity index 65%
copy from docs/content/how-to/querying-tables.md
copy to docs/content/how-to/system-tables.md
index 26535316b..c5d9e565b 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -1,9 +1,9 @@
 ---
-title: "Querying Tables"
-weight: 5
+title: "System Tables"
+weight: 6
 type: docs
 aliases:
-- /how-to/querying-tables.html
+- /how-to/system-tables.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,151 +24,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Querying Tables
-
-Just like all other tables, Paimon tables can be queried with `SELECT` 
statement.
-
-## Scan Mode
-
-By specifying the `scan.mode` table property, users can specify where and how 
Paimon sources should produce records.
-
-<table class="table table-bordered">
-<thead>
-<tr>
-<th>Scan Mode</th>
-<th>Batch Source Behavior</th>
-<th>Streaming Source Behavior</th>
-</tr>
-</thead>
-<tbody>
-<tr>
-<td>default</td>
-<td colspan="2">
-The default scan mode. Determines actual scan mode according to other table 
properties. If "scan.timestamp-millis" is set the actual scan mode will be 
"from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will 
be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
-</td>
-</tr>
-<tr>
-<td>latest-full</td>
-<td>
-Produces the latest snapshot of table.
-</td>
-<td>
-Produces the latest snapshot on the table upon first startup, and continues to 
read the following changes.
-</td>
-</tr>
-<tr>
-<td>compacted-full</td>
-<td>
-Produces the snapshot after the latest <a href="{{< ref 
"concepts/file-layouts#compaction" >}}">compaction</a>.
-</td>
-<td>
-Produces the snapshot after the latest compaction on the table upon first 
startup, and continues to read the following changes.
-</td>
-</tr>
-<tr>
-<td>latest</td>
-<td>Same as "latest-full"</td>
-<td>Continuously reads latest changes without producing a snapshot at the 
beginning.</td>
-</tr>
-<tr>
-<td>from-timestamp</td>
-<td>Produces a snapshot earlier than or equals to the timestamp specified by 
"scan.timestamp-millis".</td>
-<td>Continuously reads changes starting from timestamp specified by 
"scan.timestamp-millis", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Continuously reads changes starting from a snapshot specified by 
"scan.snapshot-id", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot-full</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Produces from snapshot specified by "scan.snapshot-id" on the table upon 
first startup, and continuously reads changes.</td>
-</tr>
-</tbody>
-</table>
-
-Users can also adjust `changelog-producer` table property to specify the 
pattern of produced changes. See [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}) for details.
-
-{{< img src="/img/scan-mode.png">}}
-
-## Time Travel
-
-Currently, Paimon supports time travel for Flink and Spark 3 (requires Spark 
3.3+).
-
-{{< tabs "time-travel-example" >}}
-
-{{< tab "Flink" >}}
-****
-you can use [dynamic table 
options](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options)
 to specify scan mode and from where to start:
-
-```sql
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot' by default
-SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot-full'
-SELECT * FROM t /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
-
--- travel to specified timestamp with a long value in milliseconds
-SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-```
-{{< /tab >}}
-
-{{< tab "Spark3" >}}
-
-you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
-
-```sql
--- travel to snapshot with id 1L (use snapshot id as version)
-SELECT * FROM t VERSION AS OF 1;
-
--- travel to specified timestamp 
-SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
-
--- you can also use a long value in seconds as timestamp
-SELECT * FROM t TIMESTAMP AS OF 1678883047;
-```
-
-{{< /tab >}}
-
-{{< tab "Trino" >}}
-
-```sql
--- travel to specified timestamp with a long value in milliseconds
-SET SESSION paimon.scan_timestamp_millis=1679486589444;
-SELECT * FROM t;
-```
-
-{{< /tab >}}
-
-{{< /tabs >}}
-
-## Consumer ID
-
-{{< hint info >}}
-This is an experimental feature.
-{{< /hint >}}
-
-You can specify the `consumer-id` when streaming read table:
-```sql
-SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
-```
-
-When stream read Paimon tables, the next snapshot id to be recorded into the 
file system. This has several advantages:
-
-1. When previous job is stopped, the newly started job can continue to consume 
from the previous progress without
-   resuming from the state. The newly reading will start reading from next 
snapshot id found in consumer files.
-2. When deciding whether a snapshot has expired, Paimon looks at all the 
consumers of the table in the file system,
-   and if there are consumers that still depend on this snapshot, then this 
snapshot will not be deleted by expiration.
-3. When there is no watermark definition, the Paimon table will pass the 
watermark in the snapshot to the downstream
-   Paimon table, which means you can track the progress of the watermark for 
the entire pipeline.
-
-{{< hint info >}}
-NOTE: The consumer will prevent expiration of the snapshot. You can specify 
'consumer.expiration-time' to manage the 
-lifetime of consumers.
-{{< /hint >}}
-
-## System Tables
+# System Tables
 
 System tables contain metadata and information about each table, such as the 
snapshots created and the options in use. Users can access system tables with 
batch queries.
 
@@ -179,7 +35,7 @@ In some cases, the table name needs to be enclosed with back 
quotes to avoid syn
 SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
 ```
 
-### Snapshots Table
+## Snapshots Table
 
 You can query the snapshot history information of the table through snapshots 
table, including the record count occurred in the snapshot.
 
@@ -199,7 +55,7 @@ SELECT * FROM MyTable$snapshots;
 
 By querying the snapshots table, you can know the commit and expiration 
information about that table and time travel through the data.
 
-### Schemas Table
+## Schemas Table
 
 You can query the historical schemas of the table through schemas table.
 
@@ -226,7 +82,7 @@ SELECT s.snapshot_id, t.schema_id, t.fields
     ON s.schema_id=t.schema_id where s.snapshot_id=100;
 ```
 
-### Options Table
+## Options Table
 
 You can query the table's option information which is specified from the DDL 
through options table. The options not shown will be the default value. You can 
take reference to  [Configuration].
 
@@ -243,7 +99,7 @@ SELECT * FROM MyTable$options;
 */
 ```
 
-### Audit log Table
+## Audit log Table
 
 If you need to audit the changelog of the table, you can use the `audit_log` 
system table. Through `audit_log` table, you can get the `rowkind` column when 
you get the incremental data of the table. You can use this column for
 filtering and other operations to complete the audit.
@@ -272,7 +128,7 @@ SELECT * FROM MyTable$audit_log;
 */
 ```
 
-### Files Table
+## Files Table
 You can query the files of the table with specific snapshot.
 
 ```
@@ -300,4 +156,4 @@ SELECT * FROM MyTable$files /*+ 
OPTIONS('scan.snapshot-id'='1') */;
 |       [1] |      0 | data-10abb5bc-0170-43ae-b6a... |         orc |         
0 |     0 |            1 |                595 |     [a] |     [a] | {cnt=0, 
val=0, word=0} | {cnt=1, val=11, word=a} | {cnt=1, val=11, word=a} 
|2023-02-24T16:06:21.166|
 
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
 3 rows in set
-```
\ No newline at end of file
+```
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index bf4265e27..f64f3d104 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@ under the License.
             <td>Integer</td>
             <td>Full compaction will be constantly triggered after delta 
commits.</td>
         </tr>
+        <tr>
+            <td><h5>incremental-between</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Read incremental changes between start snapshot (exclusive) 
and end snapshot, for example, '5,10' means changes between snapshot 5 and 
snapshot 10.</td>
+        </tr>
         <tr>
             <td><h5>local-sort.max-num-file-handles</h5></td>
             <td style="word-wrap: break-word;">128</td>
@@ -342,7 +348,7 @@ under the License.
             <td><h5>scan.mode</h5></td>
             <td style="word-wrap: break-word;">default</td>
             <td><p>Enum</p></td>
-            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
+            <td>Specify the scanning behavior of the source.<br /><br 
/>Possible values:<ul><li>"default": Determines actual startup mode according 
to other table properties. If "scan.timestamp-millis" is set the actual startup 
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is 
set the actual startup mode will be "from-snapshot". Otherwise the actual 
startup mode will be "latest-full".</li><li>"latest-full": For streaming 
sources, produces the latest snapshot  [...]
         </tr>
         <tr>
             <td><h5>scan.plan-sort-partition</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index d8ce5747f..3b56c3daf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -30,6 +30,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.options.description.DescribedEnum;
 import org.apache.paimon.options.description.Description;
 import org.apache.paimon.options.description.InlineElement;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.StringUtils;
 
 import java.io.Serializable;
@@ -684,6 +685,14 @@ public class CoreOptions implements Serializable {
                                     + " related to the number of initialized 
bucket, too small will lead to"
                                     + " insufficient processing speed of 
assigner.");
 
+    public static final ConfigOption<String> INCREMENTAL_BETWEEN =
+            key("incremental-between")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Read incremental changes between start snapshot 
(exclusive) and end snapshot, "
+                                    + "for example, '5,10' means changes 
between snapshot 5 and snapshot 10.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -904,6 +913,8 @@ public class CoreOptions implements Serializable {
             } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
                     || options.getOptional(SCAN_TAG_NAME).isPresent()) {
                 return StartupMode.FROM_SNAPSHOT;
+            } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()) {
+                return StartupMode.INCREMENTAL;
             } else {
                 return StartupMode.LATEST_FULL;
             }
@@ -930,6 +941,22 @@ public class CoreOptions implements Serializable {
         return options.get(SCAN_TAG_NAME);
     }
 
+    public Pair<String, String> incrementalBetween() {
+        String str = options.get(INCREMENTAL_BETWEEN);
+        if (str == null) {
+            return null;
+        }
+
+        String[] split = str.split(",");
+        if (split.length != 2) {
+            throw new IllegalArgumentException(
+                    "The incremental-between must specific start snapshot 
(exclusive) and end snapshot,"
+                            + " for example, '5,10' means changes between 
snapshot 5 and snapshot 10. But is: "
+                            + str);
+        }
+        return Pair.of(split[0], split[1]);
+    }
+
     public Integer scanManifestParallelism() {
         return options.get(SCAN_MANIFEST_PARALLELISM);
     }
@@ -1071,7 +1098,10 @@ public class CoreOptions implements Serializable {
                 "from-snapshot-full",
                 "For streaming sources, produces from snapshot specified by 
\"scan.snapshot-id\" "
                         + "on the table upon first startup, and continuously 
reads changes. For batch sources, "
-                        + "produces a snapshot specified by 
\"scan.snapshot-id\" but does not read new changes.");
+                        + "produces a snapshot specified by 
\"scan.snapshot-id\" but does not read new changes."),
+
+        INCREMENTAL(
+                "incremental", "Read incremental changes between start 
snapshot and end snapshot.");
 
         private final String value;
         private final String description;
@@ -1327,6 +1357,10 @@ public class CoreOptions implements Serializable {
         if (options.contains(SCAN_SNAPSHOT_ID) && 
!options.contains(SCAN_MODE)) {
             options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
         }
+
+        if (options.contains(INCREMENTAL_BETWEEN) && 
!options.contains(SCAN_MODE)) {
+            options.set(SCAN_MODE, StartupMode.INCREMENTAL);
+        }
     }
 
     public static List<ConfigOption<?>> getOptions() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index a657019b4..fee830427 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
 import static org.apache.paimon.CoreOptions.SCAN_MODE;
 import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
 import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
@@ -190,25 +191,32 @@ public class SchemaValidation {
                     options, SCAN_TIMESTAMP_MILLIS, 
CoreOptions.StartupMode.FROM_TIMESTAMP);
             checkOptionsConflict(
                     options,
-                    Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME),
+                    Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME, 
INCREMENTAL_BETWEEN),
                     Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_SNAPSHOT) {
             checkExactOneOptionExistInMode(
                     options, options.startupMode(), SCAN_SNAPSHOT_ID, 
SCAN_TAG_NAME);
             checkOptionsConflict(
                     options,
-                    Collections.singletonList(SCAN_TIMESTAMP_MILLIS),
+                    Arrays.asList(SCAN_TIMESTAMP_MILLIS, INCREMENTAL_BETWEEN),
                     Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
+        } else if (options.startupMode() == 
CoreOptions.StartupMode.INCREMENTAL) {
+            checkOptionExistInMode(options, INCREMENTAL_BETWEEN, 
options.startupMode());
+            checkOptionsConflict(
+                    options,
+                    Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS, 
SCAN_TAG_NAME),
+                    Collections.singletonList(INCREMENTAL_BETWEEN));
         } else if (options.startupMode() == 
CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
             checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, 
options.startupMode());
             checkOptionsConflict(
                     options,
-                    Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME),
+                    Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME, 
INCREMENTAL_BETWEEN),
                     Collections.singletonList(SCAN_SNAPSHOT_ID));
         } else {
             checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, 
options.startupMode());
             checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, 
options.startupMode());
             checkOptionNotExistInMode(options, SCAN_TAG_NAME, 
options.startupMode());
+            checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, 
options.startupMode());
         }
     }
 
@@ -250,11 +258,13 @@ public class SchemaValidation {
             CoreOptions options,
             List<ConfigOption<?>> illegalOptions,
             List<ConfigOption<?>> legalOptions) {
-        checkArgument(
-                illegalOptions.stream().noneMatch(op -> 
options.toConfiguration().contains(op)),
-                "[%s] must be null when you set [%s]",
-                concatConfigKeys(illegalOptions),
-                concatConfigKeys(legalOptions));
+        for (ConfigOption<?> illegalOption : illegalOptions) {
+            checkArgument(
+                    !options.toConfiguration().contains(illegalOption),
+                    "[%s] must be null when you set [%s]",
+                    illegalOption.key(),
+                    concatConfigKeys(legalOptions));
+        }
     }
 
     private static String concatConfigKeys(List<ConfigOption<?>> 
configOptions) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index f2ed2ea85..3a3d7ee62 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -33,11 +33,13 @@ import 
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingSc
 import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
 import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.utils.Pair;
 
 import java.util.List;
 import java.util.Optional;
@@ -130,6 +132,12 @@ public abstract class AbstractInnerTableScan implements 
InnerTableScan {
                 return isStreaming
                         ? new 
ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
                         : new 
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+            case INCREMENTAL:
+                checkArgument(!isStreaming, "Cannot read incremental in 
streaming mode.");
+                Pair<String, String> incremental = 
options.incrementalBetween();
+                return new IncrementalStartingScanner(
+                        Long.parseLong(incremental.getLeft()),
+                        Long.parseLong(incremental.getRight()));
             default:
                 throw new UnsupportedOperationException(
                         "Unknown startup mode " + startupMode.name());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
new file mode 100644
index 000000000..b1fc7ffba
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** {@link StartingScanner} for incremental changes. */
+public class IncrementalStartingScanner implements StartingScanner {
+
+    private final long start;
+    private final long end;
+
+    public IncrementalStartingScanner(long start, long end) {
+        this.start = start;
+        this.end = end;
+    }
+
+    @Override
+    public Result scan(SnapshotManager manager, SnapshotReader reader) {
+        Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new 
HashMap<>();
+        for (long i = start + 1; i < end + 1; i++) {
+            List<DataSplit> splits = readDeltaSplits(reader, 
manager.snapshot(i));
+            for (DataSplit split : splits) {
+                grouped.computeIfAbsent(
+                                Pair.of(split.partition(), split.bucket()), k 
-> new ArrayList<>())
+                        .addAll(split.files());
+            }
+        }
+
+        List<DataSplit> result = new ArrayList<>();
+        for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : 
grouped.entrySet()) {
+            BinaryRow partition = entry.getKey().getLeft();
+            int bucket = entry.getKey().getRight();
+            for (List<DataFileMeta> files : 
reader.splitGenerator().split(entry.getValue())) {
+                result.add(new DataSplit(end, partition, bucket, files, 
false));
+            }
+        }
+
+        return new ScannedResult(end, null, result);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s) 
{
+        if (s.commitKind() != CommitKind.APPEND) {
+            // ignore COMPACT and OVERWRITE
+            return Collections.emptyList();
+        }
+        return (List) 
reader.withSnapshot(s).withKind(ScanKind.DELTA).read().splits();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index a602eedbb..3fb8f0bb5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -25,8 +25,10 @@ import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
@@ -35,8 +37,12 @@ import java.util.List;
 /** Read splits from specified {@link Snapshot} with given configuration. */
 public interface SnapshotReader {
 
+    SnapshotManager snapshotManager();
+
     ConsumerManager consumerManager();
 
+    SplitGenerator splitGenerator();
+
     SnapshotReader withSnapshot(long snapshotId);
 
     SnapshotReader withSnapshot(Snapshot snapshot);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 15f4ab72c..19b4097be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -82,11 +82,21 @@ public class SnapshotReaderImpl implements SnapshotReader {
         this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
     }
 
+    @Override
+    public SnapshotManager snapshotManager() {
+        return snapshotManager;
+    }
+
     @Override
     public ConsumerManager consumerManager() {
         return consumerManager;
     }
 
+    @Override
+    public SplitGenerator splitGenerator() {
+        return splitGenerator;
+    }
+
     @Override
     public SnapshotReader withSnapshot(long snapshotId) {
         scan.withSnapshot(snapshotId);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 8dd94df14..fb1581439 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
@@ -186,11 +187,21 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             this.snapshotReader = snapshotReader;
         }
 
+        @Override
+        public SnapshotManager snapshotManager() {
+            return snapshotReader.snapshotManager();
+        }
+
         @Override
         public ConsumerManager consumerManager() {
             return snapshotReader.consumerManager();
         }
 
+        @Override
+        public SplitGenerator splitGenerator() {
+            return snapshotReader.splitGenerator();
+        }
+
         public SnapshotReader withSnapshot(long snapshotId) {
             snapshotReader.withSnapshot(snapshotId);
             return this;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
new file mode 100644
index 000000000..4a6a09e54
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoreOptions#INCREMENTAL_BETWEEN}. */
+public class IncrementalTableTest extends TableTestBase {
+
+    @Test
+    public void testPrimaryKeyTable() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1));
+
+        // snapshot 2: append
+        write(
+                table,
+                GenericRow.of(1, 1, 2),
+                GenericRow.of(1, 2, 2),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(2, 1, 2));
+
+        // snapshot 3: compact
+        compact(table, row(1), 0);
+
+        // snapshot 4: append
+        write(
+                table,
+                GenericRow.of(1, 1, 3),
+                GenericRow.of(1, 2, 3),
+                GenericRow.of(2, 1, 3),
+                GenericRow.of(2, 2, 1));
+
+        // snapshot 5: append
+        write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4), 
GenericRow.of(2, 1, 4));
+
+        // snapshot 6: append
+        write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5), 
GenericRow.of(2, 1, 5));
+
+        List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN, 
"2,5"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 4),
+                        GenericRow.of(1, 2, 4),
+                        GenericRow.of(2, 1, 4),
+                        GenericRow.of(2, 2, 1));
+    }
+
+    @Test
+    public void testAppendTable() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        Table table = catalog.getTable(identifier);
+
+        // snapshot 1: append
+        write(
+                table,
+                GenericRow.of(1, 1, 1),
+                GenericRow.of(1, 2, 1),
+                GenericRow.of(1, 3, 1),
+                GenericRow.of(2, 1, 1));
+
+        // snapshot 2: append
+        write(
+                table,
+                GenericRow.of(1, 1, 2),
+                GenericRow.of(1, 2, 2),
+                GenericRow.of(1, 4, 1),
+                GenericRow.of(2, 1, 2));
+
+        // snapshot 3: append
+        write(
+                table,
+                GenericRow.of(1, 1, 3),
+                GenericRow.of(1, 2, 3),
+                GenericRow.of(2, 1, 3),
+                GenericRow.of(2, 2, 1));
+
+        // snapshot 4: append
+        write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4), 
GenericRow.of(2, 1, 4));
+
+        // snapshot 5: append
+        write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5), 
GenericRow.of(2, 1, 5));
+
+        List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN, 
"2,4"));
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 1, 3),
+                        GenericRow.of(1, 2, 3),
+                        GenericRow.of(2, 1, 3),
+                        GenericRow.of(2, 2, 1),
+                        GenericRow.of(1, 1, 4),
+                        GenericRow.of(1, 2, 4),
+                        GenericRow.of(2, 1, 4));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
new file mode 100644
index 000000000..9f5b32514
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for table. */
+public abstract class TableTestBase {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    protected Path warehouse;
+    protected Catalog catalog;
+    protected String database;
+
+    @BeforeEach
+    public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+        database = "default";
+        warehouse = new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString());
+        catalog = 
CatalogFactory.createCatalog(CatalogContext.create(warehouse));
+        catalog.createDatabase(database, true);
+    }
+
+    @AfterEach
+    public void after() throws IOException {
+        // assert all connections are closed
+        Predicate<Path> pathPredicate = path -> 
path.toString().contains(tempPath.toString());
+        assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+        assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+    }
+
+    protected Identifier identifier(String tableName) {
+        return new Identifier(database, tableName);
+    }
+
+    protected void write(Table table, InternalRow... rows) throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (InternalRow row : rows) {
+                write.write(row);
+            }
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    protected void compact(Table table, BinaryRow partition, int bucket) 
throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.compact(partition, bucket, true);
+            commit.commit(write.prepareCommit());
+        }
+    }
+
+    protected List<InternalRow> read(Table table, Pair<ConfigOption<?>, 
String>... dynamicOptions)
+            throws Exception {
+        Map<String, String> options = new HashMap<>();
+        for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
+            options.put(pair.getKey().key(), pair.getValue());
+        }
+        table = table.copy(options);
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        List<InternalRow> rows = new ArrayList<>();
+        reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+        return rows;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 8f6017614..121810f4c 100644
--- 
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -127,7 +127,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                                                 time3)))
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasRootCauseMessage(
-                        "[scan.snapshot-id,scan.tag-name] must be null when 
you set [scan.timestamp-millis]");
+                        "[scan.snapshot-id] must be null when you set 
[scan.timestamp-millis]");
 
         assertThatThrownBy(
                         () ->
diff --git 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 8f6017614..121810f4c 100644
--- 
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -127,7 +127,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                                                 time3)))
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasRootCauseMessage(
-                        "[scan.snapshot-id,scan.tag-name] must be null when 
you set [scan.timestamp-millis]");
+                        "[scan.snapshot-id] must be null when you set 
[scan.timestamp-millis]");
 
         assertThatThrownBy(
                         () ->
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index a5eb105e5..a5a7acad4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -122,7 +122,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
                                                 time3)))
                 .hasRootCauseInstanceOf(IllegalArgumentException.class)
                 .hasRootCauseMessage(
-                        "[scan.snapshot-id,scan.tag-name] must be null when 
you set [scan.timestamp-millis]");
+                        "[scan.snapshot-id] must be null when you set 
[scan.timestamp-millis]");
 
         assertThatThrownBy(
                         () ->
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 92d2cc9f7..efdcddd3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -497,6 +497,12 @@ public class FlinkCatalogTest {
                 options.put("scan.snapshot-id", "1");
             } else if (mode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
                 options.put("scan.timestamp-millis", 
System.currentTimeMillis() + "");
+            } else if (mode == CoreOptions.StartupMode.INCREMENTAL) {
+                options.put("incremental-between", "2,5");
+            }
+
+            if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL) {
+                continue;
             }
             allOptions.add(options);
         }

Reply via email to