This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0391abffb [core] Introduce incremental-between read (#1437)
0391abffb is described below
commit 0391abffb068c39171491a472845e7a18bd44116
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 27 15:17:55 2023 +0800
[core] Introduce incremental-between read (#1437)
---
docs/content/how-to/cdc-ingestion.md | 2 +-
docs/content/how-to/lookup-joins.md | 2 +-
docs/content/how-to/querying-tables.md | 315 ++++++++-------------
.../{querying-tables.md => system-tables.md} | 164 +----------
.../shortcodes/generated/core_configuration.html | 8 +-
.../main/java/org/apache/paimon/CoreOptions.java | 36 ++-
.../org/apache/paimon/schema/SchemaValidation.java | 26 +-
.../table/source/AbstractInnerTableScan.java | 8 +
.../snapshot/IncrementalStartingScanner.java | 79 ++++++
.../table/source/snapshot/SnapshotReader.java | 6 +
.../table/source/snapshot/SnapshotReaderImpl.java | 10 +
.../apache/paimon/table/system/AuditLogTable.java | 11 +
.../apache/paimon/table/IncrementalTableTest.java | 150 ++++++++++
.../org/apache/paimon/table/TableTestBase.java | 115 ++++++++
.../apache/paimon/flink/BatchFileStoreITCase.java | 2 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 2 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 2 +-
.../org/apache/paimon/flink/FlinkCatalogTest.java | 6 +
18 files changed, 581 insertions(+), 363 deletions(-)
diff --git a/docs/content/how-to/cdc-ingestion.md
b/docs/content/how-to/cdc-ingestion.md
index a9c76b37d..3533d9906 100644
--- a/docs/content/how-to/cdc-ingestion.md
+++ b/docs/content/how-to/cdc-ingestion.md
@@ -1,6 +1,6 @@
---
title: "CDC Ingestion"
-weight: 7
+weight: 8
type: docs
aliases:
- /how-to/cdc-ingestion.html
diff --git a/docs/content/how-to/lookup-joins.md
b/docs/content/how-to/lookup-joins.md
index a2ea277c8..f7253e0db 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -1,6 +1,6 @@
---
title: "Lookup Joins"
-weight: 6
+weight: 7
type: docs
aliases:
- /how-to/lookup-joins.html
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 26535316b..83a9bb5cc 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -28,104 +28,40 @@ under the License.
Just like all other tables, Paimon tables can be queried with `SELECT`
statement.
-## Scan Mode
-
-By specifying the `scan.mode` table property, users can specify where and how
Paimon sources should produce records.
-
-<table class="table table-bordered">
-<thead>
-<tr>
-<th>Scan Mode</th>
-<th>Batch Source Behavior</th>
-<th>Streaming Source Behavior</th>
-</tr>
-</thead>
-<tbody>
-<tr>
-<td>default</td>
-<td colspan="2">
-The default scan mode. Determines actual scan mode according to other table
properties. If "scan.timestamp-millis" is set the actual scan mode will be
"from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will
be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
-</td>
-</tr>
-<tr>
-<td>latest-full</td>
-<td>
-Produces the latest snapshot of table.
-</td>
-<td>
-Produces the latest snapshot on the table upon first startup, and continues to
read the following changes.
-</td>
-</tr>
-<tr>
-<td>compacted-full</td>
-<td>
-Produces the snapshot after the latest <a href="{{< ref
"concepts/file-layouts#compaction" >}}">compaction</a>.
-</td>
-<td>
-Produces the snapshot after the latest compaction on the table upon first
startup, and continues to read the following changes.
-</td>
-</tr>
-<tr>
-<td>latest</td>
-<td>Same as "latest-full"</td>
-<td>Continuously reads latest changes without producing a snapshot at the
beginning.</td>
-</tr>
-<tr>
-<td>from-timestamp</td>
-<td>Produces a snapshot earlier than or equals to the timestamp specified by
"scan.timestamp-millis".</td>
-<td>Continuously reads changes starting from timestamp specified by
"scan.timestamp-millis", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Continuously reads changes starting from a snapshot specified by
"scan.snapshot-id", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot-full</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Produces from snapshot specified by "scan.snapshot-id" on the table upon
first startup, and continuously reads changes.</td>
-</tr>
-</tbody>
-</table>
-
-Users can also adjust `changelog-producer` table property to specify the
pattern of produced changes. See [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}) for details.
-
-{{< img src="/img/scan-mode.png">}}
-
-## Time Travel
-
-Currently, Paimon supports time travel for Flink and Spark 3 (requires Spark
3.3+).
+## Batch Query
+
+Paimon's batch read returns all the data in a snapshot of the table. By
default, batch reads return the latest snapshot.
+
+### Batch Time Travel
+
+Paimon batch reads with time travel can specify a snapshot and read the
corresponding data.
{{< tabs "time-travel-example" >}}
{{< tab "Flink" >}}
-****
-you can use [dynamic table
options](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options)
to specify scan mode and from where to start:
-
```sql
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot' by default
+-- read the snapshot with id 1L
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot-full'
-SELECT * FROM t /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
-
--- travel to specified timestamp with a long value in milliseconds
+-- read the snapshot from specified timestamp in unix milliseconds
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
```
{{< /tab >}}
{{< tab "Spark3" >}}
+Requires Spark 3.3+.
+
you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
```sql
--- travel to snapshot with id 1L (use snapshot id as version)
+-- read the snapshot with id 1L (use snapshot id as version)
SELECT * FROM t VERSION AS OF 1;
--- travel to specified timestamp
+-- read the snapshot from specified timestamp
SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
--- you can also use a long value in seconds as timestamp
+-- read the snapshot from specified timestamp in unix seconds
SELECT * FROM t TIMESTAMP AS OF 1678883047;
```
@@ -134,7 +70,7 @@ SELECT * FROM t TIMESTAMP AS OF 1678883047;
{{< tab "Trino" >}}
```sql
--- travel to specified timestamp with a long value in milliseconds
+-- read the snapshot from specified timestamp with a long value in unix
milliseconds
SET SESSION paimon.scan_timestamp_millis=1679486589444;
SELECT * FROM t;
```
@@ -143,7 +79,78 @@ SELECT * FROM t;
{{< /tabs >}}
-## Consumer ID
+### Batch Incremental
+
+Read incremental changes between start snapshot (exclusive) and end snapshot.
+
+For example, '5,10' means changes between snapshot 5 and snapshot 10.
+
+{{< tabs "incremental-example" >}}
+
+{{< tab "Flink" >}}
+```sql
+SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
+```
+{{< /tab >}}
+
+{{< tab "Spark" >}}
+
+```java
+spark.read()
+ .format("paimon")
+ .option("incremental-between", "12,20")
+ .load("path/to/table")
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Streaming Query
+
+By default, Streaming read produces the latest snapshot on the table upon
first startup,
+and continue to read the latest changes.
+
+Paimon by default ensures that your startup is properly processed with the
full amount
+included.
+
+You can also do streaming read without the snapshot data, you can use `latest`
scan mode:
+
+{{< tabs "latest streaming read" >}}
+{{< tab "Flink" >}}
+```sql
+-- Continuously reads latest changes without producing a snapshot at the
beginning.
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Streaming Time Travel
+
+If you only want to process data for today and beyond, you can do so with
partitioned filters:
+
+```sql
+SELECT * FROM t WHERE dt > '2023-06-26'
+```
+
+If it's not a partitioned table, or you can't filter by partition, you can use
Time travel's stream read.
+
+{{< tabs "streaming-time-travel" >}}
+{{< tab "Flink" >}}
+```sql
+-- read changes from snapshot id 1L
+SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
+
+-- read changes from snapshot specified timestamp
+SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
+
+-- read snapshot id 1L upon first startup, and continue to read the changes
+SELECT * FROM t /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Consumer ID
{{< hint info >}}
This is an experimental feature.
@@ -168,136 +175,56 @@ NOTE: The consumer will prevent expiration of the
snapshot. You can specify 'con
lifetime of consumers.
{{< /hint >}}
-## System Tables
+## Query Optimization
-System tables contain metadata and information about each table, such as the
snapshots created and the options in use. Users can access system tables with
batch queries.
+{{< label Batch >}}{{< label Streaming >}}
-Currently, Flink, Spark and Trino supports querying system tables.
+It is highly recommended to specify partition and primary key filters
+along with the query, which will speed up the data skipping of the query.
-In some cases, the table name needs to be enclosed with back quotes to avoid
syntax parsing conflicts, for example triple access mode:
-```sql
-SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
-```
-
-### Snapshots Table
-
-You can query the snapshot history information of the table through snapshots
table, including the record count occurred in the snapshot.
-
-```sql
-SELECT * FROM MyTable$snapshots;
-
-/*
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+
-| snapshot_id | schema_id | commit_user | commit_identifier |
commit_kind | commit_time | total_record_count |
delta_record_count | changelog_record_count |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+
-| 2 | 0 | 7ca4cd28-98e... | 2 |
APPEND | 2022-10-26 11:44:15.600 | 2 | 2 |
0 |
-| 1 | 0 | 870062aa-3e9... | 1 |
APPEND | 2022-10-26 11:44:15.148 | 1 | 1 |
0 |
-+--------------+------------+-----------------+-------------------+--------------+-------------------------+---------------------+---------------------+-------------------------+
-2 rows in set
-*/
-```
+The filter functions that can accelerate data skipping are:
+- `=`
+- `<`
+- `<=`
+- `>`
+- `>=`
+- `IN (...)`
+- `LIKE 'abc%'`
+- `IS NULL`
-By querying the snapshots table, you can know the commit and expiration
information about that table and time travel through the data.
+Paimon will sort the data by primary key, which speeds up the point queries
+and range queries. When using a composite primary key, it is best for the query
+filters to form a [leftmost
prefix](https://dev.mysql.com/doc/refman/5.7/en/multiple-column-indexes.html)
+of the primary key for good acceleration.
-### Schemas Table
-
-You can query the historical schemas of the table through schemas table.
+Suppose that a table has the following specification:
```sql
-SELECT * FROM MyTable$schemas;
-
-/*
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-| schema_id | fields | partition_keys | primary_keys |
options | comment |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-| 0 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | |
-| 1 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | |
-| 2 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-3 rows in set
-*/
+CREATE TABLE orders (
+ catalog_id BIGINT,
+ order_id BIGINT,
+ .....,
+ PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
+)
```
-You can join the snapshots table and schemas table to get the fields of given
snapshots.
+The query obtains a good acceleration by specifying a range filter for
+the leftmost prefix of the primary key.
```sql
-SELECT s.snapshot_id, t.schema_id, t.fields
- FROM MyTable$snapshots s JOIN MyTable$schemas t
- ON s.schema_id=t.schema_id where s.snapshot_id=100;
-```
+SELECT * FROM orders WHERE catalog_id=1025;
-### Options Table
+SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
-You can query the table's option information which is specified from the DDL
through options table. The options not shown will be the default value. You can
take reference to [Configuration].
-
-```sql
-SELECT * FROM MyTable$options;
-
-/*
-+------------------------+--------------------+
-| key | value |
-+------------------------+--------------------+
-| snapshot.time-retained | 5 h |
-+------------------------+--------------------+
-1 rows in set
-*/
+SELECT * FROM orders
+ WHERE catalog_id=1025
+ AND order_id>2035 AND order_id<6000;
```
-### Audit log Table
-
-If you need to audit the changelog of the table, you can use the `audit_log`
system table. Through `audit_log` table, you can get the `rowkind` column when
you get the incremental data of the table. You can use this column for
-filtering and other operations to complete the audit.
-
-There are four values for `rowkind`:
-
-- `+I`: Insertion operation.
-- `-U`: Update operation with the previous content of the updated row.
-- `+U`: Update operation with new content of the updated row.
-- `-D`: Deletion operation.
+However, the following filter cannot accelerate the query well.
```sql
-SELECT * FROM MyTable$audit_log;
-
-/*
-+------------------+-----------------+-----------------+
-| rowkind | column_0 | column_1 |
-+------------------+-----------------+-----------------+
-| +I | ... | ... |
-+------------------+-----------------+-----------------+
-| -U | ... | ... |
-+------------------+-----------------+-----------------+
-| +U | ... | ... |
-+------------------+-----------------+-----------------+
-3 rows in set
-*/
-```
+SELECT * FROM orders WHERE order_id=29495;
-### Files Table
-You can query the files of the table with specific snapshot.
-
-```
--- Query the files of latest snapshot
-SELECT * FROM MyTable$files;
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-| partition | bucket | file_path | file_format |
schema_id | level | record_count | file_size_in_bytes | min_key | max_key |
null_value_counts | min_value_stats | max_value_stats |
creation_time |
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-| [3] | 0 | data-8f64af95-29cc-4342-adc... | orc |
0 | 0 | 1 | 593 | [c] | [c] | {cnt=0,
val=0, word=0} | {cnt=3, val=33, word=c} | {cnt=3, val=33, word=c}
|2023-02-24T16:06:21.166|
-| [2] | 0 | data-8b369068-0d37-4011-aa5... | orc |
0 | 0 | 1 | 593 | [b] | [b] | {cnt=0,
val=0, word=0} | {cnt=2, val=22, word=b} | {cnt=2, val=22, word=b}
|2023-02-24T16:06:21.166|
-| [2] | 0 | data-83aa7973-060b-40b6-8c8... | orc |
0 | 0 | 1 | 605 | [d] | [d] | {cnt=0,
val=0, word=0} | {cnt=2, val=32, word=d} | {cnt=2, val=32, word=d}
|2023-02-24T16:06:21.166|
-| [5] | 0 | data-3d304f4a-bcea-44dc-a13... | orc |
0 | 0 | 1 | 593 | [c] | [c] | {cnt=0,
val=0, word=0} | {cnt=5, val=51, word=c} | {cnt=5, val=51, word=c}
|2023-02-24T16:06:21.166|
-| [1] | 0 | data-10abb5bc-0170-43ae-b6a... | orc |
0 | 0 | 1 | 595 | [a] | [a] | {cnt=0,
val=0, word=0} | {cnt=1, val=11, word=a} | {cnt=1, val=11, word=a}
|2023-02-24T16:06:21.166|
-| [4] | 0 | data-2c9b7095-65b7-4013-a7a... | orc |
0 | 0 | 1 | 593 | [a] | [a] | {cnt=0,
val=0, word=0} | {cnt=4, val=12, word=a} | {cnt=4, val=12, word=a}
|2023-02-24T16:06:21.166|
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-6 rows in set
-
--- You can also query the files with specific snapshot
-SELECT * FROM MyTable$files /*+ OPTIONS('scan.snapshot-id'='1') */;
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-| partition | bucket | file_path | file_format |
schema_id | level | record_count | file_size_in_bytes | min_key | max_key |
null_value_counts | min_value_stats | max_value_stats |
creation_time |
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-| [3] | 0 | data-8f64af95-29cc-4342-adc... | orc |
0 | 0 | 1 | 593 | [c] | [c] | {cnt=0,
val=0, word=0} | {cnt=3, val=33, word=c} | {cnt=3, val=33, word=c}
|2023-02-24T16:06:21.166|
-| [2] | 0 | data-8b369068-0d37-4011-aa5... | orc |
0 | 0 | 1 | 593 | [b] | [b] | {cnt=0,
val=0, word=0} | {cnt=2, val=22, word=b} | {cnt=2, val=22, word=b}
|2023-02-24T16:06:21.166|
-| [1] | 0 | data-10abb5bc-0170-43ae-b6a... | orc |
0 | 0 | 1 | 595 | [a] | [a] | {cnt=0,
val=0, word=0} | {cnt=1, val=11, word=a} | {cnt=1, val=11, word=a}
|2023-02-24T16:06:21.166|
-+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
-3 rows in set
+SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
```
\ No newline at end of file
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/system-tables.md
similarity index 65%
copy from docs/content/how-to/querying-tables.md
copy to docs/content/how-to/system-tables.md
index 26535316b..c5d9e565b 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -1,9 +1,9 @@
---
-title: "Querying Tables"
-weight: 5
+title: "System Tables"
+weight: 6
type: docs
aliases:
-- /how-to/querying-tables.html
+- /how-to/system-tables.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -24,151 +24,7 @@ specific language governing permissions and limitations
under the License.
-->
-# Querying Tables
-
-Just like all other tables, Paimon tables can be queried with `SELECT`
statement.
-
-## Scan Mode
-
-By specifying the `scan.mode` table property, users can specify where and how
Paimon sources should produce records.
-
-<table class="table table-bordered">
-<thead>
-<tr>
-<th>Scan Mode</th>
-<th>Batch Source Behavior</th>
-<th>Streaming Source Behavior</th>
-</tr>
-</thead>
-<tbody>
-<tr>
-<td>default</td>
-<td colspan="2">
-The default scan mode. Determines actual scan mode according to other table
properties. If "scan.timestamp-millis" is set the actual scan mode will be
"from-timestamp", and if "scan.snapshot-id" is set the actual startup mode will
be "from-snapshot". Otherwise the actual scan mode will be "latest-full".
-</td>
-</tr>
-<tr>
-<td>latest-full</td>
-<td>
-Produces the latest snapshot of table.
-</td>
-<td>
-Produces the latest snapshot on the table upon first startup, and continues to
read the following changes.
-</td>
-</tr>
-<tr>
-<td>compacted-full</td>
-<td>
-Produces the snapshot after the latest <a href="{{< ref
"concepts/file-layouts#compaction" >}}">compaction</a>.
-</td>
-<td>
-Produces the snapshot after the latest compaction on the table upon first
startup, and continues to read the following changes.
-</td>
-</tr>
-<tr>
-<td>latest</td>
-<td>Same as "latest-full"</td>
-<td>Continuously reads latest changes without producing a snapshot at the
beginning.</td>
-</tr>
-<tr>
-<td>from-timestamp</td>
-<td>Produces a snapshot earlier than or equals to the timestamp specified by
"scan.timestamp-millis".</td>
-<td>Continuously reads changes starting from timestamp specified by
"scan.timestamp-millis", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Continuously reads changes starting from a snapshot specified by
"scan.snapshot-id", without producing a snapshot at the beginning.</td>
-</tr>
-<tr>
-<td>from-snapshot-full</td>
-<td>Produces a snapshot specified by "scan.snapshot-id".</td>
-<td>Produces from snapshot specified by "scan.snapshot-id" on the table upon
first startup, and continuously reads changes.</td>
-</tr>
-</tbody>
-</table>
-
-Users can also adjust `changelog-producer` table property to specify the
pattern of produced changes. See [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}) for details.
-
-{{< img src="/img/scan-mode.png">}}
-
-## Time Travel
-
-Currently, Paimon supports time travel for Flink and Spark 3 (requires Spark
3.3+).
-
-{{< tabs "time-travel-example" >}}
-
-{{< tab "Flink" >}}
-****
-you can use [dynamic table
options](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options)
to specify scan mode and from where to start:
-
-```sql
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot' by default
-SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-
--- travel to snapshot with id 1L with 'scan.mode'='from-snapshot-full'
-SELECT * FROM t /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;
-
--- travel to specified timestamp with a long value in milliseconds
-SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
-```
-{{< /tab >}}
-
-{{< tab "Spark3" >}}
-
-you can use `VERSION AS OF` and `TIMESTAMP AS OF` in query to do time travel:
-
-```sql
--- travel to snapshot with id 1L (use snapshot id as version)
-SELECT * FROM t VERSION AS OF 1;
-
--- travel to specified timestamp
-SELECT * FROM t TIMESTAMP AS OF '2023-06-01 00:00:00.123';
-
--- you can also use a long value in seconds as timestamp
-SELECT * FROM t TIMESTAMP AS OF 1678883047;
-```
-
-{{< /tab >}}
-
-{{< tab "Trino" >}}
-
-```sql
--- travel to specified timestamp with a long value in milliseconds
-SET SESSION paimon.scan_timestamp_millis=1679486589444;
-SELECT * FROM t;
-```
-
-{{< /tab >}}
-
-{{< /tabs >}}
-
-## Consumer ID
-
-{{< hint info >}}
-This is an experimental feature.
-{{< /hint >}}
-
-You can specify the `consumer-id` when streaming read table:
-```sql
-SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
-```
-
-When stream read Paimon tables, the next snapshot id to be recorded into the
file system. This has several advantages:
-
-1. When previous job is stopped, the newly started job can continue to consume
from the previous progress without
- resuming from the state. The newly reading will start reading from next
snapshot id found in consumer files.
-2. When deciding whether a snapshot has expired, Paimon looks at all the
consumers of the table in the file system,
- and if there are consumers that still depend on this snapshot, then this
snapshot will not be deleted by expiration.
-3. When there is no watermark definition, the Paimon table will pass the
watermark in the snapshot to the downstream
- Paimon table, which means you can track the progress of the watermark for
the entire pipeline.
-
-{{< hint info >}}
-NOTE: The consumer will prevent expiration of the snapshot. You can specify
'consumer.expiration-time' to manage the
-lifetime of consumers.
-{{< /hint >}}
-
-## System Tables
+# System Tables
System tables contain metadata and information about each table, such as the
snapshots created and the options in use. Users can access system tables with
batch queries.
@@ -179,7 +35,7 @@ In some cases, the table name needs to be enclosed with back
quotes to avoid syn
SELECT * FROM my_catalog.my_db.`MyTable$snapshots`;
```
-### Snapshots Table
+## Snapshots Table
You can query the snapshot history information of the table through snapshots
table, including the record count occurred in the snapshot.
@@ -199,7 +55,7 @@ SELECT * FROM MyTable$snapshots;
By querying the snapshots table, you can know the commit and expiration
information about that table and time travel through the data.
-### Schemas Table
+## Schemas Table
You can query the historical schemas of the table through schemas table.
@@ -226,7 +82,7 @@ SELECT s.snapshot_id, t.schema_id, t.fields
ON s.schema_id=t.schema_id where s.snapshot_id=100;
```
-### Options Table
+## Options Table
You can query the table's option information which is specified from the DDL
through options table. The options not shown will be the default value. You can
take reference to [Configuration].
@@ -243,7 +99,7 @@ SELECT * FROM MyTable$options;
*/
```
-### Audit log Table
+## Audit log Table
If you need to audit the changelog of the table, you can use the `audit_log`
system table. Through `audit_log` table, you can get the `rowkind` column when
you get the incremental data of the table. You can use this column for
filtering and other operations to complete the audit.
@@ -272,7 +128,7 @@ SELECT * FROM MyTable$audit_log;
*/
```
-### Files Table
+## Files Table
You can query the files of the table with specific snapshot.
```
@@ -300,4 +156,4 @@ SELECT * FROM MyTable$files /*+
OPTIONS('scan.snapshot-id'='1') */;
| [1] | 0 | data-10abb5bc-0170-43ae-b6a... | orc |
0 | 0 | 1 | 595 | [a] | [a] | {cnt=0,
val=0, word=0} | {cnt=1, val=11, word=a} | {cnt=1, val=11, word=a}
|2023-02-24T16:06:21.166|
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+-----------------------+
3 rows in set
-```
\ No newline at end of file
+```
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index bf4265e27..f64f3d104 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@ under the License.
<td>Integer</td>
<td>Full compaction will be constantly triggered after delta
commits.</td>
</tr>
+ <tr>
+ <td><h5>incremental-between</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Read incremental changes between start snapshot (exclusive)
and end snapshot, for example, '5,10' means changes between snapshot 5 and
snapshot 10.</td>
+ </tr>
<tr>
<td><h5>local-sort.max-num-file-handles</h5></td>
<td style="word-wrap: break-word;">128</td>
@@ -342,7 +348,7 @@ under the License.
<td><h5>scan.mode</h5></td>
<td style="word-wrap: break-word;">default</td>
<td><p>Enum</p></td>
- <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
+ <td>Specify the scanning behavior of the source.<br /><br
/>Possible values:<ul><li>"default": Determines actual startup mode according
to other table properties. If "scan.timestamp-millis" is set the actual startup
mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is
set the actual startup mode will be "from-snapshot". Otherwise the actual
startup mode will be "latest-full".</li><li>"latest-full": For streaming
sources, produces the latest snapshot [...]
</tr>
<tr>
<td><h5>scan.plan-sort-partition</h5></td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index d8ce5747f..3b56c3daf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -30,6 +30,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
import java.io.Serializable;
@@ -684,6 +685,14 @@ public class CoreOptions implements Serializable {
+ " related to the number of initialized
bucket, too small will lead to"
+ " insufficient processing speed of
assigner.");
+ public static final ConfigOption<String> INCREMENTAL_BETWEEN =
+ key("incremental-between")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Read incremental changes between start snapshot
(exclusive) and end snapshot, "
+ + "for example, '5,10' means changes
between snapshot 5 and snapshot 10.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -904,6 +913,8 @@ public class CoreOptions implements Serializable {
} else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()
|| options.getOptional(SCAN_TAG_NAME).isPresent()) {
return StartupMode.FROM_SNAPSHOT;
+ } else if (options.getOptional(INCREMENTAL_BETWEEN).isPresent()) {
+ return StartupMode.INCREMENTAL;
} else {
return StartupMode.LATEST_FULL;
}
@@ -930,6 +941,22 @@ public class CoreOptions implements Serializable {
return options.get(SCAN_TAG_NAME);
}
+ public Pair<String, String> incrementalBetween() {
+ String str = options.get(INCREMENTAL_BETWEEN);
+ if (str == null) {
+ return null;
+ }
+
+ String[] split = str.split(",");
+ if (split.length != 2) {
+ throw new IllegalArgumentException(
+ "The incremental-between must specific start snapshot
(exclusive) and end snapshot,"
+ + " for example, '5,10' means changes between
snapshot 5 and snapshot 10. But is: "
+ + str);
+ }
+ return Pair.of(split[0], split[1]);
+ }
+
public Integer scanManifestParallelism() {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
@@ -1071,7 +1098,10 @@ public class CoreOptions implements Serializable {
"from-snapshot-full",
"For streaming sources, produces from snapshot specified by
\"scan.snapshot-id\" "
+ "on the table upon first startup, and continuously
reads changes. For batch sources, "
- + "produces a snapshot specified by
\"scan.snapshot-id\" but does not read new changes.");
+ + "produces a snapshot specified by
\"scan.snapshot-id\" but does not read new changes."),
+
+ INCREMENTAL(
+ "incremental", "Read incremental changes between start
snapshot and end snapshot.");
private final String value;
private final String description;
@@ -1327,6 +1357,10 @@ public class CoreOptions implements Serializable {
if (options.contains(SCAN_SNAPSHOT_ID) &&
!options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
}
+
+ if (options.contains(INCREMENTAL_BETWEEN) &&
!options.contains(SCAN_MODE)) {
+ options.set(SCAN_MODE, StartupMode.INCREMENTAL);
+ }
}
public static List<ConfigOption<?>> getOptions() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index a657019b4..fee830427 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
@@ -190,25 +191,32 @@ public class SchemaValidation {
options, SCAN_TIMESTAMP_MILLIS,
CoreOptions.StartupMode.FROM_TIMESTAMP);
checkOptionsConflict(
options,
- Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME),
+ Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME,
INCREMENTAL_BETWEEN),
Collections.singletonList(SCAN_TIMESTAMP_MILLIS));
} else if (options.startupMode() ==
CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
options, options.startupMode(), SCAN_SNAPSHOT_ID,
SCAN_TAG_NAME);
checkOptionsConflict(
options,
- Collections.singletonList(SCAN_TIMESTAMP_MILLIS),
+ Arrays.asList(SCAN_TIMESTAMP_MILLIS, INCREMENTAL_BETWEEN),
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
+ } else if (options.startupMode() ==
CoreOptions.StartupMode.INCREMENTAL) {
+ checkOptionExistInMode(options, INCREMENTAL_BETWEEN,
options.startupMode());
+ checkOptionsConflict(
+ options,
+ Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS,
SCAN_TAG_NAME),
+ Collections.singletonList(INCREMENTAL_BETWEEN));
} else if (options.startupMode() ==
CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID,
options.startupMode());
checkOptionsConflict(
options,
- Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME),
+ Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME,
INCREMENTAL_BETWEEN),
Collections.singletonList(SCAN_SNAPSHOT_ID));
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS,
options.startupMode());
checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID,
options.startupMode());
checkOptionNotExistInMode(options, SCAN_TAG_NAME,
options.startupMode());
+ checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN,
options.startupMode());
}
}
@@ -250,11 +258,13 @@ public class SchemaValidation {
CoreOptions options,
List<ConfigOption<?>> illegalOptions,
List<ConfigOption<?>> legalOptions) {
- checkArgument(
- illegalOptions.stream().noneMatch(op ->
options.toConfiguration().contains(op)),
- "[%s] must be null when you set [%s]",
- concatConfigKeys(illegalOptions),
- concatConfigKeys(legalOptions));
+ for (ConfigOption<?> illegalOption : illegalOptions) {
+ checkArgument(
+ !options.toConfiguration().contains(illegalOption),
+ "[%s] must be null when you set [%s]",
+ illegalOption.key(),
+ concatConfigKeys(legalOptions));
+ }
}
private static String concatConfigKeys(List<ConfigOption<?>>
configOptions) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index f2ed2ea85..3a3d7ee62 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -33,11 +33,13 @@ import
org.apache.paimon.table.source.snapshot.ContinuousFromTimestampStartingSc
import org.apache.paimon.table.source.snapshot.ContinuousLatestStartingScanner;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
+import org.apache.paimon.table.source.snapshot.IncrementalStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
+import org.apache.paimon.utils.Pair;
import java.util.List;
import java.util.Optional;
@@ -130,6 +132,12 @@ public abstract class AbstractInnerTableScan implements
InnerTableScan {
return isStreaming
? new
ContinuousFromSnapshotFullStartingScanner(options.scanSnapshotId())
: new
StaticFromSnapshotStartingScanner(options.scanSnapshotId());
+ case INCREMENTAL:
+ checkArgument(!isStreaming, "Cannot read incremental in
streaming mode.");
+ Pair<String, String> incremental =
options.incrementalBetween();
+ return new IncrementalStartingScanner(
+ Long.parseLong(incremental.getLeft()),
+ Long.parseLong(incremental.getRight()));
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
new file mode 100644
index 000000000..b1fc7ffba
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.ScanKind;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** {@link StartingScanner} for incremental changes. */
+public class IncrementalStartingScanner implements StartingScanner {
+
+ private final long start;
+ private final long end;
+
+ public IncrementalStartingScanner(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public Result scan(SnapshotManager manager, SnapshotReader reader) {
+ Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
HashMap<>();
+ for (long i = start + 1; i < end + 1; i++) {
+ List<DataSplit> splits = readDeltaSplits(reader,
manager.snapshot(i));
+ for (DataSplit split : splits) {
+ grouped.computeIfAbsent(
+ Pair.of(split.partition(), split.bucket()), k
-> new ArrayList<>())
+ .addAll(split.files());
+ }
+ }
+
+ List<DataSplit> result = new ArrayList<>();
+ for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry :
grouped.entrySet()) {
+ BinaryRow partition = entry.getKey().getLeft();
+ int bucket = entry.getKey().getRight();
+ for (List<DataFileMeta> files :
reader.splitGenerator().split(entry.getValue())) {
+ result.add(new DataSplit(end, partition, bucket, files,
false));
+ }
+ }
+
+ return new ScannedResult(end, null, result);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private List<DataSplit> readDeltaSplits(SnapshotReader reader, Snapshot s)
{
+ if (s.commitKind() != CommitKind.APPEND) {
+ // ignore COMPACT and OVERWRITE
+ return Collections.emptyList();
+ }
+ return (List)
reader.withSnapshot(s).withKind(ScanKind.DELTA).read().splits();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index a602eedbb..3fb8f0bb5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -25,8 +25,10 @@ import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -35,8 +37,12 @@ import java.util.List;
/** Read splits from specified {@link Snapshot} with given configuration. */
public interface SnapshotReader {
+ SnapshotManager snapshotManager();
+
ConsumerManager consumerManager();
+ SplitGenerator splitGenerator();
+
SnapshotReader withSnapshot(long snapshotId);
SnapshotReader withSnapshot(Snapshot snapshot);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 15f4ab72c..19b4097be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -82,11 +82,21 @@ public class SnapshotReaderImpl implements SnapshotReader {
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
}
+ @Override
+ public SnapshotManager snapshotManager() {
+ return snapshotManager;
+ }
+
@Override
public ConsumerManager consumerManager() {
return consumerManager;
}
+ @Override
+ public SplitGenerator splitGenerator() {
+ return splitGenerator;
+ }
+
@Override
public SnapshotReader withSnapshot(long snapshotId) {
scan.withSnapshot(snapshotId);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 8dd94df14..fb1581439 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
@@ -186,11 +187,21 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
this.snapshotReader = snapshotReader;
}
+ @Override
+ public SnapshotManager snapshotManager() {
+ return snapshotReader.snapshotManager();
+ }
+
@Override
public ConsumerManager consumerManager() {
return snapshotReader.consumerManager();
}
+ @Override
+ public SplitGenerator splitGenerator() {
+ return snapshotReader.splitGenerator();
+ }
+
public SnapshotReader withSnapshot(long snapshotId) {
snapshotReader.withSnapshot(snapshotId);
return this;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
new file mode 100644
index 000000000..4a6a09e54
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CoreOptions#INCREMENTAL_BETWEEN}. */
+public class IncrementalTableTest extends TableTestBase {
+
+ @Test
+ public void testPrimaryKeyTable() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1));
+
+ // snapshot 2: append
+ write(
+ table,
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 2));
+
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+
+ // snapshot 4: append
+ write(
+ table,
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1));
+
+ // snapshot 5: append
+ write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4),
GenericRow.of(2, 1, 4));
+
+ // snapshot 6: append
+ write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5),
GenericRow.of(2, 1, 5));
+
+ List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN,
"2,5"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 4),
+ GenericRow.of(1, 2, 4),
+ GenericRow.of(2, 1, 4),
+ GenericRow.of(2, 2, 1));
+ }
+
+ @Test
+ public void testAppendTable() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1));
+
+ // snapshot 2: append
+ write(
+ table,
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 1, 2));
+
+ // snapshot 3: append
+ write(
+ table,
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1));
+
+ // snapshot 4: append
+ write(table, GenericRow.of(1, 1, 4), GenericRow.of(1, 2, 4),
GenericRow.of(2, 1, 4));
+
+ // snapshot 5: append
+ write(table, GenericRow.of(1, 1, 5), GenericRow.of(1, 2, 5),
GenericRow.of(2, 1, 5));
+
+ List<InternalRow> result = read(table, Pair.of(INCREMENTAL_BETWEEN,
"2,4"));
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ GenericRow.of(1, 1, 3),
+ GenericRow.of(1, 2, 3),
+ GenericRow.of(2, 1, 3),
+ GenericRow.of(2, 2, 1),
+ GenericRow.of(1, 1, 4),
+ GenericRow.of(1, 2, 4),
+ GenericRow.of(2, 1, 4));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
new file mode 100644
index 000000000..9f5b32514
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for table. */
+public abstract class TableTestBase {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ protected Path warehouse;
+ protected Catalog catalog;
+ protected String database;
+
+ @BeforeEach
+ public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+ database = "default";
+ warehouse = new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString());
+ catalog =
CatalogFactory.createCatalog(CatalogContext.create(warehouse));
+ catalog.createDatabase(database, true);
+ }
+
+ @AfterEach
+ public void after() throws IOException {
+ // assert all connections are closed
+ Predicate<Path> pathPredicate = path ->
path.toString().contains(tempPath.toString());
+ assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+ assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+ }
+
+ protected Identifier identifier(String tableName) {
+ return new Identifier(database, tableName);
+ }
+
+ protected void write(Table table, InternalRow... rows) throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (InternalRow row : rows) {
+ write.write(row);
+ }
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ protected void compact(Table table, BinaryRow partition, int bucket)
throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.compact(partition, bucket, true);
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ protected List<InternalRow> read(Table table, Pair<ConfigOption<?>,
String>... dynamicOptions)
+ throws Exception {
+ Map<String, String> options = new HashMap<>();
+ for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
+ options.put(pair.getKey().key(), pair.getValue());
+ }
+ table = table.copy(options);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ return rows;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 8f6017614..121810f4c 100644
---
a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -127,7 +127,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
time3)))
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
- "[scan.snapshot-id,scan.tag-name] must be null when
you set [scan.timestamp-millis]");
+ "[scan.snapshot-id] must be null when you set
[scan.timestamp-millis]");
assertThatThrownBy(
() ->
diff --git
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 8f6017614..121810f4c 100644
---
a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -127,7 +127,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
time3)))
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
- "[scan.snapshot-id,scan.tag-name] must be null when
you set [scan.timestamp-millis]");
+ "[scan.snapshot-id] must be null when you set
[scan.timestamp-millis]");
assertThatThrownBy(
() ->
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index a5eb105e5..a5a7acad4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -122,7 +122,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
time3)))
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasRootCauseMessage(
- "[scan.snapshot-id,scan.tag-name] must be null when
you set [scan.timestamp-millis]");
+ "[scan.snapshot-id] must be null when you set
[scan.timestamp-millis]");
assertThatThrownBy(
() ->
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 92d2cc9f7..efdcddd3e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -497,6 +497,12 @@ public class FlinkCatalogTest {
options.put("scan.snapshot-id", "1");
} else if (mode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
options.put("scan.timestamp-millis",
System.currentTimeMillis() + "");
+ } else if (mode == CoreOptions.StartupMode.INCREMENTAL) {
+ options.put("incremental-between", "2,5");
+ }
+
+ if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL) {
+ continue;
}
allOptions.add(options);
}