This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 8bda95e1540396d7ec5d305b2041b5f6bc9877f0
Author: umi <[email protected]>
AuthorDate: Fri Oct 17 10:03:44 2025 +0800

    [Python] Introduce incremental-between read by timestamp (#6391)
---
 docs/content/program-api/python-api.md             | 192 ++++++++-
 paimon-python/pypaimon/__init__.py                 |   2 +-
 paimon-python/pypaimon/common/core_options.py      |   1 +
 paimon-python/pypaimon/filesystem/pvfs.py          |   2 +-
 .../pypaimon/manifest/manifest_file_manager.py     |   3 +-
 .../pypaimon/manifest/manifest_list_manager.py     |   3 +
 paimon-python/pypaimon/read/scanner/__init__.py    |  17 +
 .../read/scanner/empty_starting_scanner.py         |  25 ++
 .../full_starting_scanner.py}                      | 151 +------
 .../read/scanner/incremental_starting_scanner.py   |  77 ++++
 .../pypaimon/read/scanner/starting_scanner.py      |  28 ++
 paimon-python/pypaimon/read/table_read.py          |   2 +-
 paimon-python/pypaimon/read/table_scan.py          | 463 ++-------------------
 .../pypaimon/snapshot/snapshot_manager.py          |  55 +++
 paimon-python/pypaimon/table/file_store_table.py   |  17 +
 .../pypaimon/tests/filesystem_catalog_test.py      |   3 +-
 paimon-python/pypaimon/tests/predicates_test.py    |   3 +-
 .../pypaimon/tests/py36/ao_predicate_test.py       |   8 +-
 .../pypaimon/tests/py36/ao_simple_test.py          |   6 +-
 .../pypaimon/tests/py36/rest_ao_read_write_test.py |  53 ++-
 .../pypaimon/tests/reader_append_only_test.py      |  71 +++-
 paimon-python/pypaimon/tests/reader_base_test.py   |  27 +-
 .../pypaimon/tests/reader_primary_key_test.py      |  79 +++-
 .../pypaimon/tests/rest/rest_base_test.py          |   3 +-
 paimon-python/pypaimon/write/file_store_commit.py  |   4 +-
 25 files changed, 670 insertions(+), 625 deletions(-)

diff --git a/docs/content/program-api/python-api.md 
b/docs/content/program-api/python-api.md
index 6f7d8bb697..579fce109e 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -286,7 +286,8 @@ for batch in table_read.to_arrow_batch_reader(splits):
 ```
 
 #### Python Iterator
-You can read the data row by row into a native Python iterator. 
+
+You can read the data row by row into a native Python iterator.
 This is convenient for custom row-based processing logic.
 
 ```python
@@ -365,23 +366,177 @@ print(ray_dataset.to_pandas())
 # ...
 ```
 
+### Incremental Read Between Timestamps
+
+This API allows reading data committed between two snapshot timestamps. The 
steps are as follows.
+
+- Set the option `CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP` on a copied table 
via `table.copy({...})`. The value must
+  be a string: `"startMillis,endMillis"`, where `startMillis` is exclusive and 
`endMillis` is inclusive.
+- Use `SnapshotManager` to obtain snapshot timestamps or you can determine 
them by yourself.
+- Read the data as above.
+
+Example:
+
+```python
+from pypaimon import CatalogFactory
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+# Prepare catalog and obtain a table
+catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'})
+table = catalog.get_table('default.your_table_name')
+
+# Assume the table has at least two snapshots (1 and 2)
+snapshot_manager = SnapshotManager(table)
+t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+
+# Read records committed between [t1, t2]
+table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
f"{t1},{t2}"})
+
+read_builder = table_inc.new_read_builder()
+table_scan = read_builder.new_scan()
+table_read = read_builder.new_read()
+splits = table_scan.plan().splits()
+
+# To Arrow
+arrow_table = table_read.to_arrow(splits)
+
+# Or to pandas
+pandas_df = table_read.to_pandas(splits)
+```
+
+### Shard Read
+
+Shard Read allows you to read data in parallel by dividing the table into 
multiple shards. This is useful for
+distributed processing and parallel computation.
+
+You can specify the shard index and total number of shards to read a specific 
portion of the data:
+
+```python
+# Prepare read builder
+table = catalog.get_table('database_name.table_name')
+read_builder = table.new_read_builder()
+table_read = read_builder.new_read()
+
+# Read the second shard (index 1) out of 3 total shards
+splits = read_builder.new_scan().with_shard(1, 3).plan().splits()
+
+# Read all shards and concatenate results
+splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+
+# Combine results from all shards
+
+all_splits = splits1 + splits2 + splits3
+pa_table = table_read.to_arrow(all_splits)
+```
+
+Example with shard read:
+
+```python
+import pyarrow as pa
+from pypaimon import CatalogFactory, Schema
+
+# Create catalog
+catalog_options = {'warehouse': 'file:///path/to/warehouse'}
+catalog = CatalogFactory.create(catalog_options)
+catalog.create_database("default", False)
+# Define schema
+pa_schema = pa.schema([
+    ('user_id', pa.int64()),
+    ('item_id', pa.int64()),
+    ('behavior', pa.string()),
+    ('dt', pa.string()),
+])
+
+# Create table and write data
+schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+catalog.create_table('default.test_table', schema, False)
+table = catalog.get_table('default.test_table')
+
+# Write data in two batches
+write_builder = table.new_batch_write_builder()
+
+# First write
+table_write = write_builder.new_write()
+table_commit = write_builder.new_commit()
+data1 = {
+    'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14],
+    'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 
1011, 1012, 1013, 1014],
+    'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 
'l', 'm'],
+    'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 
'p1', 'p2', 'p1'],
+}
+pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+table_write.write_arrow(pa_table)
+table_commit.commit(table_write.prepare_commit())
+table_write.close()
+table_commit.close()
+
+# Second write
+table_write = write_builder.new_write()
+table_commit = write_builder.new_commit()
+data2 = {
+    'user_id': [5, 6, 7, 8, 18],
+    'item_id': [1005, 1006, 1007, 1008, 1018],
+    'behavior': ['e', 'f', 'g', 'h', 'z'],
+    'dt': ['p2', 'p1', 'p2', 'p2', 'p1'],
+}
+pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+table_write.write_arrow(pa_table)
+table_commit.commit(table_write.prepare_commit())
+table_write.close()
+table_commit.close()
+
+# Read specific shard
+read_builder = table.new_read_builder()
+table_read = read_builder.new_read()
+
+# Read shard 2 out of 3 total shards
+splits = read_builder.new_scan().with_shard(2, 3).plan().splits()
+shard_data = table_read.to_arrow(splits)
+
+# Verify shard distribution by reading all shards
+splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits()
+splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits()
+splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits()
+
+# Combine all shards should equal full table read
+all_shards_data = pa.concat_tables([
+    table_read.to_arrow(splits1),
+    table_read.to_arrow(splits2),
+    table_read.to_arrow(splits3),
+])
+full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits())
+```
+
+Key points about shard read:
+
+- **Shard Index**: Zero-based index of the shard to read (0 to total_shards-1)
+- **Total Shards**: Total number of shards to divide the data into
+- **Data Distribution**: Data is distributed evenly across shards, with 
remainder rows going to the last shard
+- **Parallel Processing**: Each shard can be processed independently for 
better performance
+- **Consistency**: Combining all shards should produce the complete table data
+
 ## Data Types
-| Python Native Type | PyArrow Type | Paimon Type |
-| :--- | :--- | :--- |
-| `int` | `pyarrow.int8()` | `TINYINT` |
-| `int` | `pyarrow.int16()` | `SMALLINT` |
-| `int` | `pyarrow.int32()` | `INT` |
-| `int` | `pyarrow.int64()` | `BIGINT` |
-| `float` | `pyarrow.float32()` | `FLOAT` |
-| `float` | `pyarrow.float64()` | `DOUBLE` |
-| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
-| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
-| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
-| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
-| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | 
`DECIMAL(precision, scale)` |
-| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
-| `datetime.date` | `pyarrow.date32()` | `DATE` |
-| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | 
`TIME(p)` |
+
+| Python Native Type  | PyArrow Type                                     | 
Paimon Type                       |
+|:--------------------|:-------------------------------------------------|:----------------------------------|
+| `int`               | `pyarrow.int8()`                                 | 
`TINYINT`                         |
+| `int`               | `pyarrow.int16()`                                | 
`SMALLINT`                        |
+| `int`               | `pyarrow.int32()`                                | 
`INT`                             |
+| `int`               | `pyarrow.int64()`                                | 
`BIGINT`                          |
+| `float`             | `pyarrow.float32()`                              | 
`FLOAT`                           |
+| `float`             | `pyarrow.float64()`                              | 
`DOUBLE`                          |
+| `bool`              | `pyarrow.bool_()`                                | 
`BOOLEAN`                         |
+| `str`               | `pyarrow.string()`                               | 
`STRING`, `CHAR(n)`, `VARCHAR(n)` |
+| `bytes`             | `pyarrow.binary()`                               | 
`BYTES`, `VARBINARY(n)`           |
+| `bytes`             | `pyarrow.binary(length)`                         | 
`BINARY(length)`                  |
+| `decimal.Decimal`   | `pyarrow.decimal128(precision, scale)`           | 
`DECIMAL(precision, scale)`       |
+| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)`               | 
`TIMESTAMP(p)`                    |
+| `datetime.date`     | `pyarrow.date32()`                               | 
`DATE`                            |
+| `datetime.time`     | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | 
`TIME(p)`                         |
 
 ## Predicate
 
@@ -402,5 +557,4 @@ print(ray_dataset.to_pandas())
 | f.contains(literal)   | PredicateBuilder.contains(f, literal)         |
 | f is in [l1, l2]      | PredicateBuilder.is_in(f, [l1, l2])           |
 | f is not in [l1, l2]  | PredicateBuilder.is_not_in(f, [l1, l2])       |
-| lower <= f <= upper   | PredicateBuilder.between(f, lower, upper)     |
-
+| lower <= f <= upper   | PredicateBuilder.between(f, lower, upper)     |
\ No newline at end of file
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/__init__.py
index 35a8a72ff1..fc4e7d3b61 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -15,8 +15,8 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
 from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
 from pypaimon.schema.schema import Schema
 
 __version__ = "0.3.dev"
diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 754bb17c05..82b788438e 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -45,5 +45,6 @@ class CoreOptions(str, Enum):
     FILE_BLOCK_SIZE = "file.block-size"
     # Scan options
     SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
+    INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
     # Commit options
     COMMIT_USER_PREFIX = "commit.user-prefix"
diff --git a/paimon-python/pypaimon/filesystem/pvfs.py 
b/paimon-python/pypaimon/filesystem/pvfs.py
index d7dabbbcaa..15f9f9f8dc 100644
--- a/paimon-python/pypaimon/filesystem/pvfs.py
+++ b/paimon-python/pypaimon/filesystem/pvfs.py
@@ -29,7 +29,7 @@ from fsspec import AbstractFileSystem
 from fsspec.implementations.local import LocalFileSystem
 from readerwriterlock import rwlock
 
-from pypaimon.api.api_response import GetTableTokenResponse, GetTableResponse
+from pypaimon.api.api_response import GetTableResponse, GetTableTokenResponse
 from pypaimon.api.client import AlreadyExistsException, NoSuchResourceException
 from pypaimon.api.rest_api import RESTApi
 from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index e3c9601cf4..9893b8659a 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,10 +15,11 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import fastavro
 from io import BytesIO
 from typing import List
 
+import fastavro
+
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 2fc1eea011..0fc58652f6 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -47,6 +47,9 @@ class ManifestListManager:
         manifest_files.extend(delta_manifests)
         return manifest_files
 
+    def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
+        return self.read(snapshot.delta_manifest_list)
+
     def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
         manifest_files = []
 
diff --git a/paimon-python/pypaimon/read/scanner/__init__.py 
b/paimon-python/pypaimon/read/scanner/__init__.py
new file mode 100644
index 0000000000..53ed4d36c2
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/__init__.py
@@ -0,0 +1,17 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
new file mode 100644
index 0000000000..9c870aa588
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py
@@ -0,0 +1,25 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from pypaimon.read.plan import Plan
+from pypaimon.read.scanner.starting_scanner import StartingScanner
+
+
+class EmptyStartingScanner(StartingScanner):
+
+    def scan(self) -> Plan:
+        return Plan([])
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
similarity index 72%
copy from paimon-python/pypaimon/read/table_scan.py
copy to paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index d76725ca97..4275b3f3e2 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -1,21 +1,20 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
 from collections import defaultdict
 from typing import Callable, List, Optional
 
@@ -29,17 +28,15 @@ from pypaimon.read.interval_partition import 
IntervalPartition, SortedRun
 from pypaimon.read.plan import Plan
 from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
                                            extract_predicate_to_list)
+from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.read.split import Split
 from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.bucket_mode import BucketMode
 
 
-class TableScan:
-    """Implementation of TableScan for native Python reading."""
-
-    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int],
-                 read_type: List[DataField]):
+class FullStartingScanner(StartingScanner):
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int], read_type: List[DataField]):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -68,16 +65,13 @@ class TableScan:
 
         self.only_read_real_buckets = True if int(
             self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
-        self.data_evolution = self.table.options.get('data-evolution.enabled', 
'false').lower() == 'true'
 
-    def plan(self) -> Plan:
+    def scan(self) -> Plan:
         file_entries = self.plan_files()
         if not file_entries:
             return Plan([])
         if self.table.is_primary_key_table:
             splits = self._create_primary_key_splits(file_entries)
-        elif self.data_evolution:
-            splits = self._create_data_evolution_splits(file_entries)
         else:
             splits = self._create_append_only_splits(file_entries)
 
@@ -256,48 +250,6 @@ class TableScan:
             "row_count": file_entry.file.row_count,
         })
 
-    def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) 
-> List['Split']:
-        """
-        Create data evolution splits for append-only tables with schema 
evolution.
-        This method groups files by firstRowId and creates splits that can 
handle
-        column merging across different schema versions.
-        """
-        partitioned_files = defaultdict(list)
-        for entry in file_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        if self.idx_of_this_subtask is not None:
-            partitioned_files, plan_start_row, plan_end_row = 
self._append_only_filter_by_shard(partitioned_files)
-
-        def weight_func(file_list: List[DataFileMeta]) -> int:
-            return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
-
-        splits = []
-        for key, file_entries in partitioned_files.items():
-            if not file_entries:
-                continue
-
-            data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
-            # Split files by firstRowId for data evolution
-            split_by_row_id = self._split_by_row_id(data_files)
-
-            # Pack the split groups for optimal split sizes
-            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(split_by_row_id, weight_func,
-                                                                               
   self.target_split_size)
-
-            # Flatten the packed files and build splits
-            flatten_packed_files: List[List[DataFileMeta]] = [
-                [file for sub_pack in pack for file in sub_pack]
-                for pack in packed_files
-            ]
-
-            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, False)
-
-        if self.idx_of_this_subtask is not None:
-            self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
-        return splits
-
     def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
         partitioned_files = defaultdict(list)
         for entry in file_entries:
@@ -405,64 +357,3 @@ class TableScan:
             packed.append(bin_items)
 
         return packed
-
-    @staticmethod
-    def _is_blob_file(file_name: str) -> bool:
-        """Check if a file is a blob file based on its extension."""
-        return file_name.endswith('.blob')
-
-    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
-        """
-        Split files by firstRowId for data evolution.
-        This method groups files that have the same firstRowId, which is 
essential
-        for handling schema evolution where files with different schemas need 
to be
-        read together to merge columns.
-        """
-        split_by_row_id = []
-
-        # Sort files by firstRowId and then by maxSequenceNumber
-        # Files with null firstRowId are treated as having Long.MIN_VALUE
-        def sort_key(file: DataFileMeta) -> tuple:
-            first_row_id = file.first_row_id if file.first_row_id is not None 
else float('-inf')
-            is_blob = 1 if self._is_blob_file(file.file_name) else 0
-            # For files with same firstRowId, sort by maxSequenceNumber in 
descending order
-            # (larger sequence number means more recent data)
-            max_seq = file.max_sequence_number
-            return (first_row_id, is_blob, -max_seq)
-
-        sorted_files = sorted(files, key=sort_key)
-
-        # Split files by firstRowId
-        last_row_id = -1
-        check_row_id_start = 0
-        current_split = []
-
-        for file in sorted_files:
-            first_row_id = file.first_row_id
-            if first_row_id is None:
-                # Files without firstRowId are treated as individual splits
-                split_by_row_id.append([file])
-                continue
-
-            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
-                if current_split:
-                    split_by_row_id.append(current_split)
-
-                # Validate that files don't overlap
-                if first_row_id < check_row_id_start:
-                    file_names = [f.file_name for f in sorted_files]
-                    raise ValueError(
-                        f"There are overlapping files in the split: 
{file_names}, "
-                        f"the wrong file is: {file.file_name}"
-                    )
-
-                current_split = []
-                last_row_id = first_row_id
-                check_row_id_start = first_row_id + file.row_count
-
-            current_split.append(file)
-
-        if current_split:
-            split_by_row_id.append(current_split)
-
-        return split_by_row_id
diff --git 
a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
new file mode 100644
index 0000000000..f1a9cd03bc
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py
@@ -0,0 +1,77 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from typing import List, Optional
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.schema.data_types import DataField
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+
+class IncrementalStartingScanner(FullStartingScanner):
+    def __init__(self, table, predicate: Optional[Predicate], limit: 
Optional[int],
+                 read_type: List[DataField], start: int, end: int):
+        super().__init__(table, predicate, limit, read_type)
+        self.startingSnapshotId = start
+        self.endingSnapshotId = end
+
+    def plan_files(self) -> List[ManifestEntry]:
+        snapshots_in_range = []
+        for snapshot_id in range(self.startingSnapshotId + 1, 
self.endingSnapshotId + 1):
+            snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
+            if snapshot.commit_kind == "APPEND":
+                snapshots_in_range.append(snapshot)
+
+        # Collect all file entries from all snapshots in range
+        file_entries = []
+
+        for snapshot in snapshots_in_range:
+            # Get manifest files for this snapshot
+            manifest_files = self.manifest_list_manager.read_delta(snapshot)
+
+            # Read all entries from manifest files
+            for manifest_file in manifest_files:
+                entries = 
self.manifest_file_manager.read(manifest_file.file_name)
+                file_entries.extend(entries)
+        if self.predicate:
+            file_entries = self._filter_by_predicate(file_entries)
+        return file_entries
+
+    @staticmethod
+    def between_timestamps(table, predicate: Optional[Predicate], limit: 
Optional[int],
+                           read_type: List[DataField], start_timestamp: int,
+                           end_timestamp: int) -> 'IncrementalStartingScanner':
+        """
+        Create an IncrementalStartingScanner for snapshots between two 
timestamps.
+        """
+        snapshot_manager = SnapshotManager(table)
+        starting_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(start_timestamp)
+        earliest_snapshot = snapshot_manager.try_get_earliest_snapshot()
+
+        # If earliest_snapshot.time_millis > start_timestamp we should include 
the earliest_snapshot
+        if starting_snapshot is None or (earliest_snapshot and 
earliest_snapshot.time_millis > start_timestamp):
+            start_id = earliest_snapshot.id - 1 if earliest_snapshot else -1
+        else:
+            start_id = starting_snapshot.id
+
+        end_snapshot = 
snapshot_manager.earlier_or_equal_time_mills(end_timestamp)
+        latest_snapshot = snapshot_manager.get_latest_snapshot()
+        end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if 
latest_snapshot else -1)
+
+        return IncrementalStartingScanner(table, predicate, limit, read_type, 
start_id, end_id)
diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/starting_scanner.py
new file mode 100644
index 0000000000..7e6cdfd81a
--- /dev/null
+++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py
@@ -0,0 +1,28 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from abc import ABC, abstractmethod
+
+from pypaimon.read.plan import Plan
+
+
+class StartingScanner(ABC):
+    """Helper class for the first planning of TableScan."""
+
+    @abstractmethod
+    def scan(self) -> Plan:
+        """Plan the files to read."""
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index b33fb2c6ad..cf32d04d72 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from typing import Iterator, List, Optional, Any
+from typing import Any, Iterator, List, Optional
 
 import pandas
 import pyarrow
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index d76725ca97..93a7babb30 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -16,23 +16,19 @@
 # limitations under the License.
 
################################################################################
 
-from collections import defaultdict
-from typing import Callable, List, Optional
+from typing import List, Optional
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
-from pypaimon.common.predicate_builder import PredicateBuilder
-from pypaimon.manifest.manifest_file_manager import ManifestFileManager
-from pypaimon.manifest.manifest_list_manager import ManifestListManager
-from pypaimon.manifest.schema.data_file_meta import DataFileMeta
-from pypaimon.manifest.schema.manifest_entry import ManifestEntry
-from pypaimon.read.interval_partition import IntervalPartition, SortedRun
+
 from pypaimon.read.plan import Plan
-from pypaimon.read.push_down_utils import (extract_predicate_to_dict,
-                                           extract_predicate_to_list)
-from pypaimon.read.split import Split
+from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.read.scanner.incremental_starting_scanner import \
+    IncrementalStartingScanner
+from pypaimon.read.scanner.starting_scanner import StartingScanner
 from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
-from pypaimon.table.bucket_mode import BucketMode
 
 
 class TableScan:
@@ -46,423 +42,36 @@ class TableScan:
         self.predicate = predicate
         self.limit = limit
         self.read_type = read_type
-
-        self.snapshot_manager = SnapshotManager(table)
-        self.manifest_list_manager = ManifestListManager(table)
-        self.manifest_file_manager = ManifestFileManager(table)
-
-        pk_conditions = []
-        trimmed_pk = [field.name for field in 
self.table.table_schema.get_trimmed_primary_key_fields()]
-        extract_predicate_to_list(pk_conditions, self.predicate, trimmed_pk)
-        self.primary_key_predicate = 
PredicateBuilder(self.table.fields).and_predicates(pk_conditions)
-
-        partition_conditions = defaultdict(list)
-        extract_predicate_to_dict(partition_conditions, self.predicate, 
self.table.partition_keys)
-        self.partition_key_predicate = partition_conditions
-
-        self.target_split_size = 128 * 1024 * 1024
-        self.open_file_cost = 4 * 1024 * 1024
-
-        self.idx_of_this_subtask = None
-        self.number_of_para_subtasks = None
-
-        self.only_read_real_buckets = True if int(
-            self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
-        self.data_evolution = self.table.options.get('data-evolution.enabled', 
'false').lower() == 'true'
+        self.starting_scanner = self._create_starting_scanner()
 
     def plan(self) -> Plan:
-        file_entries = self.plan_files()
-        if not file_entries:
-            return Plan([])
-        if self.table.is_primary_key_table:
-            splits = self._create_primary_key_splits(file_entries)
-        elif self.data_evolution:
-            splits = self._create_data_evolution_splits(file_entries)
-        else:
-            splits = self._create_append_only_splits(file_entries)
-
-        splits = self._apply_push_down_limit(splits)
-        return Plan(splits)
-
-    def plan_files(self) -> List[ManifestEntry]:
-        latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-        if not latest_snapshot:
-            return []
-        manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
-
-        deleted_entries = set()
-        added_entries = []
-        # TODO: filter manifest files by predicate
-        for manifest_file in manifest_files:
-            manifest_entries = 
self.manifest_file_manager.read(manifest_file.file_name,
-                                                               lambda row: 
self._bucket_filter(row))
-            for entry in manifest_entries:
-                if entry.kind == 0:
-                    added_entries.append(entry)
-                else:
-                    deleted_entries.add((tuple(entry.partition.values), 
entry.bucket, entry.file.file_name))
-
-        file_entries = [
-            entry for entry in added_entries
-            if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in deleted_entries
-        ]
-        if self.predicate:
-            file_entries = self._filter_by_predicate(file_entries)
-        return file_entries
+        return self.starting_scanner.scan()
+
+    def _create_starting_scanner(self) -> Optional[StartingScanner]:
+        options = self.table.options
+        if CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP in options:
+            ts = options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP].split(",")
+            if len(ts) != 2:
+                raise ValueError(
+                    "The incremental-between-timestamp must specific 
start(exclusive) and end timestamp. But is: " +
+                    options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP])
+            earliest_snapshot = 
SnapshotManager(self.table).try_get_earliest_snapshot()
+            latest_snapshot = SnapshotManager(self.table).get_latest_snapshot()
+            if earliest_snapshot is None or latest_snapshot is None:
+                return EmptyStartingScanner()
+            start_timestamp = int(ts[0])
+            end_timestamp = int(ts[1])
+            if start_timestamp >= end_timestamp:
+                raise ValueError(
+                    "Ending timestamp %s should be >= starting timestamp %s." 
% (end_timestamp, start_timestamp))
+            if (start_timestamp == end_timestamp or start_timestamp > 
latest_snapshot.time_millis
+                    or end_timestamp < earliest_snapshot.time_millis):
+                return EmptyStartingScanner()
+            return IncrementalStartingScanner.between_timestamps(self.table, 
self.predicate, self.limit, self.read_type,
+                                                                 
start_timestamp,
+                                                                 end_timestamp)
+        return FullStartingScanner(self.table, self.predicate, self.limit, 
self.read_type)
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'TableScan':
-        if idx_of_this_subtask >= number_of_para_subtasks:
-            raise Exception("idx_of_this_subtask must be less than 
number_of_para_subtasks")
-        self.idx_of_this_subtask = idx_of_this_subtask
-        self.number_of_para_subtasks = number_of_para_subtasks
+        self.starting_scanner.with_shard(idx_of_this_subtask, 
number_of_para_subtasks)
         return self
-
-    def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> 
(defaultdict, int, int):
-        total_row = 0
-        # Sort by file creation time to ensure consistent sharding
-        for key, file_entries in partitioned_files.items():
-            for entry in file_entries:
-                total_row += entry.file.row_count
-
-        # Calculate number of rows this shard should process
-        # Last shard handles all remaining rows (handles non-divisible cases)
-        if self.idx_of_this_subtask == self.number_of_para_subtasks - 1:
-            num_row = total_row - total_row // self.number_of_para_subtasks * 
self.idx_of_this_subtask
-        else:
-            num_row = total_row // self.number_of_para_subtasks
-        # Calculate start row and end row position for current shard in all 
data
-        start_row = self.idx_of_this_subtask * (total_row // 
self.number_of_para_subtasks)
-        end_row = start_row + num_row
-
-        plan_start_row = 0
-        plan_end_row = 0
-        entry_end_row = 0  # end row position of current file in all data
-        splits_start_row = 0
-        filtered_partitioned_files = defaultdict(list)
-        # Iterate through all file entries to find files that overlap with 
current shard range
-        for key, file_entries in partitioned_files.items():
-            filtered_entries = []
-            for entry in file_entries:
-                entry_begin_row = entry_end_row  # Starting row position of 
current file in all data
-                entry_end_row += entry.file.row_count  # Update to row 
position after current file
-
-                # If current file is completely after shard range, stop 
iteration
-                if entry_begin_row >= end_row:
-                    break
-                # If current file is completely before shard range, skip it
-                if entry_end_row <= start_row:
-                    continue
-                if entry_begin_row <= start_row < entry_end_row:
-                    splits_start_row = entry_begin_row
-                    plan_start_row = start_row - entry_begin_row
-                # If shard end position is within current file, record 
relative end position
-                if entry_begin_row < end_row <= entry_end_row:
-                    plan_end_row = end_row - splits_start_row
-                # Add files that overlap with shard range to result
-                filtered_entries.append(entry)
-            if filtered_entries:
-                filtered_partitioned_files[key] = filtered_entries
-
-        return filtered_partitioned_files, plan_start_row, plan_end_row
-
-    def _compute_split_start_end_row(self, splits: List[Split], 
plan_start_row, plan_end_row):
-        file_end_row = 0  # end row position of current file in all data
-        for split in splits:
-            files = split.files
-            split_start_row = file_end_row
-            # Iterate through all file entries to find files that overlap with 
current shard range
-            for file in files:
-                file_begin_row = file_end_row  # Starting row position of 
current file in all data
-                file_end_row += file.row_count  # Update to row position after 
current file
-
-                # If shard start position is within current file, record 
actual start position and relative offset
-                if file_begin_row <= plan_start_row < file_end_row:
-                    split.split_start_row = plan_start_row - file_begin_row
-
-                # If shard end position is within current file, record 
relative end position
-                if file_begin_row < plan_end_row <= file_end_row:
-                    split.split_end_row = plan_end_row - split_start_row
-            if split.split_start_row is None:
-                split.split_start_row = 0
-            if split.split_end_row is None:
-                split.split_end_row = split.row_count
-
-    def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) 
-> List[ManifestEntry]:
-        filtered_entries = []
-        for entry in file_entries:
-            if entry.bucket % self.number_of_para_subtasks == 
self.idx_of_this_subtask:
-                filtered_entries.append(entry)
-        return filtered_entries
-
-    def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool:
-        bucket = entry.bucket
-        if self.only_read_real_buckets and bucket < 0:
-            return False
-        return True
-
-    def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
-        if self.limit is None:
-            return splits
-        scanned_row_count = 0
-        limited_splits = []
-
-        for split in splits:
-            if split.raw_convertible:
-                limited_splits.append(split)
-                scanned_row_count += split.row_count
-                if scanned_row_count >= self.limit:
-                    return limited_splits
-
-        return limited_splits
-
-    def _filter_by_predicate(self, file_entries: List[ManifestEntry]) -> 
List[ManifestEntry]:
-        if not self.predicate:
-            return file_entries
-
-        filtered_files = []
-        for file_entry in file_entries:
-            if self.partition_key_predicate and not 
self._filter_by_partition(file_entry):
-                continue
-            if not self._filter_by_stats(file_entry):
-                continue
-            filtered_files.append(file_entry)
-
-        return filtered_files
-
-    def _filter_by_partition(self, file_entry: ManifestEntry) -> bool:
-        partition_dict = file_entry.partition.to_dict()
-        for field_name, conditions in self.partition_key_predicate.items():
-            partition_value = partition_dict[field_name]
-            for predicate in conditions:
-                if not predicate.test_by_value(partition_value):
-                    return False
-        return True
-
-    def _filter_by_stats(self, file_entry: ManifestEntry) -> bool:
-        if file_entry.kind != 0:
-            return False
-        if self.table.is_primary_key_table:
-            predicate = self.primary_key_predicate
-            stats = file_entry.file.key_stats
-        else:
-            predicate = self.predicate
-            stats = file_entry.file.value_stats
-        return predicate.test_by_stats({
-            "min_values": stats.min_values.to_dict(),
-            "max_values": stats.max_values.to_dict(),
-            "null_counts": {
-                stats.min_values.fields[i].name: stats.null_counts[i] for i in 
range(len(stats.min_values.fields))
-            },
-            "row_count": file_entry.file.row_count,
-        })
-
-    def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) 
-> List['Split']:
-        """
-        Create data evolution splits for append-only tables with schema 
evolution.
-        This method groups files by firstRowId and creates splits that can 
handle
-        column merging across different schema versions.
-        """
-        partitioned_files = defaultdict(list)
-        for entry in file_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        if self.idx_of_this_subtask is not None:
-            partitioned_files, plan_start_row, plan_end_row = 
self._append_only_filter_by_shard(partitioned_files)
-
-        def weight_func(file_list: List[DataFileMeta]) -> int:
-            return max(sum(f.file_size for f in file_list), 
self.open_file_cost)
-
-        splits = []
-        for key, file_entries in partitioned_files.items():
-            if not file_entries:
-                continue
-
-            data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
-            # Split files by firstRowId for data evolution
-            split_by_row_id = self._split_by_row_id(data_files)
-
-            # Pack the split groups for optimal split sizes
-            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(split_by_row_id, weight_func,
-                                                                               
   self.target_split_size)
-
-            # Flatten the packed files and build splits
-            flatten_packed_files: List[List[DataFileMeta]] = [
-                [file for sub_pack in pack for file in sub_pack]
-                for pack in packed_files
-            ]
-
-            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, False)
-
-        if self.idx_of_this_subtask is not None:
-            self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
-        return splits
-
-    def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
-        partitioned_files = defaultdict(list)
-        for entry in file_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        if self.idx_of_this_subtask is not None:
-            partitioned_files, plan_start_row, plan_end_row = 
self._append_only_filter_by_shard(partitioned_files)
-
-        def weight_func(f: DataFileMeta) -> int:
-            return max(f.file_size, self.open_file_cost)
-
-        splits = []
-        for key, file_entries in partitioned_files.items():
-            if not file_entries:
-                return []
-
-            data_files: List[DataFileMeta] = [e.file for e in file_entries]
-
-            packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(data_files, weight_func,
-                                                                            
self.target_split_size)
-            splits += self._build_split_from_pack(packed_files, file_entries, 
False)
-        if self.idx_of_this_subtask is not None:
-            self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
-        return splits
-
-    def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
-        if self.idx_of_this_subtask is not None:
-            file_entries = self._primary_key_filter_by_shard(file_entries)
-        partitioned_files = defaultdict(list)
-        for entry in file_entries:
-            partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
-
-        def weight_func(fl: List[DataFileMeta]) -> int:
-            return max(sum(f.file_size for f in fl), self.open_file_cost)
-
-        splits = []
-        for key, file_entries in partitioned_files.items():
-            if not file_entries:
-                return []
-
-            data_files: List[DataFileMeta] = [e.file for e in file_entries]
-            partition_sort_runs: List[List[SortedRun]] = 
IntervalPartition(data_files).partition()
-            sections: List[List[DataFileMeta]] = [
-                [file for s in sl for file in s.files]
-                for sl in partition_sort_runs
-            ]
-
-            packed_files: List[List[List[DataFileMeta]]] = 
self._pack_for_ordered(sections, weight_func,
-                                                                               
   self.target_split_size)
-            flatten_packed_files: List[List[DataFileMeta]] = [
-                [file for sub_pack in pack for file in sub_pack]
-                for pack in packed_files
-            ]
-            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, True)
-        return splits
-
-    def _build_split_from_pack(self, packed_files, file_entries, 
for_primary_key_split: bool) -> List['Split']:
-        splits = []
-        for file_group in packed_files:
-            raw_convertible = True
-            if for_primary_key_split:
-                raw_convertible = len(file_group) == 1
-
-            file_paths = []
-            total_file_size = 0
-            total_record_count = 0
-
-            for data_file in file_group:
-                data_file.set_file_path(self.table.table_path, 
file_entries[0].partition,
-                                        file_entries[0].bucket)
-                file_paths.append(data_file.file_path)
-                total_file_size += data_file.file_size
-                total_record_count += data_file.row_count
-
-            if file_paths:
-                split = Split(
-                    files=file_group,
-                    partition=file_entries[0].partition,
-                    bucket=file_entries[0].bucket,
-                    _file_paths=file_paths,
-                    _row_count=total_record_count,
-                    _file_size=total_file_size,
-                    raw_convertible=raw_convertible
-                )
-                splits.append(split)
-        return splits
-
-    @staticmethod
-    def _pack_for_ordered(items: List, weight_func: Callable, target_weight: 
int) -> List[List]:
-        packed = []
-        bin_items = []
-        bin_weight = 0
-
-        for item in items:
-            weight = weight_func(item)
-            if bin_weight + weight > target_weight and len(bin_items) > 0:
-                packed.append(list(bin_items))
-                bin_items.clear()
-                bin_weight = 0
-
-            bin_weight += weight
-            bin_items.append(item)
-
-        if len(bin_items) > 0:
-            packed.append(bin_items)
-
-        return packed
-
-    @staticmethod
-    def _is_blob_file(file_name: str) -> bool:
-        """Check if a file is a blob file based on its extension."""
-        return file_name.endswith('.blob')
-
-    def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
-        """
-        Split files by firstRowId for data evolution.
-        This method groups files that have the same firstRowId, which is 
essential
-        for handling schema evolution where files with different schemas need 
to be
-        read together to merge columns.
-        """
-        split_by_row_id = []
-
-        # Sort files by firstRowId and then by maxSequenceNumber
-        # Files with null firstRowId are treated as having Long.MIN_VALUE
-        def sort_key(file: DataFileMeta) -> tuple:
-            first_row_id = file.first_row_id if file.first_row_id is not None 
else float('-inf')
-            is_blob = 1 if self._is_blob_file(file.file_name) else 0
-            # For files with same firstRowId, sort by maxSequenceNumber in 
descending order
-            # (larger sequence number means more recent data)
-            max_seq = file.max_sequence_number
-            return (first_row_id, is_blob, -max_seq)
-
-        sorted_files = sorted(files, key=sort_key)
-
-        # Split files by firstRowId
-        last_row_id = -1
-        check_row_id_start = 0
-        current_split = []
-
-        for file in sorted_files:
-            first_row_id = file.first_row_id
-            if first_row_id is None:
-                # Files without firstRowId are treated as individual splits
-                split_by_row_id.append([file])
-                continue
-
-            if not self._is_blob_file(file.file_name) and first_row_id != 
last_row_id:
-                if current_split:
-                    split_by_row_id.append(current_split)
-
-                # Validate that files don't overlap
-                if first_row_id < check_row_id_start:
-                    file_names = [f.file_name for f in sorted_files]
-                    raise ValueError(
-                        f"There are overlapping files in the split: 
{file_names}, "
-                        f"the wrong file is: {file.file_name}"
-                    )
-
-                current_split = []
-                last_row_id = first_row_id
-                check_row_id_start = first_row_id + file.row_count
-
-            current_split.append(file)
-
-        if current_split:
-            split_by_row_id.append(current_split)
-
-        return split_by_row_id
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index 2ded357802..87b42c6f35 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -59,3 +59,58 @@ class SnapshotManager:
             Path to the snapshot file
         """
         return self.snapshot_dir / f"snapshot-{snapshot_id}"
+
+    def try_get_earliest_snapshot(self) -> Optional[Snapshot]:
+        if self.file_io.exists(self.snapshot_dir / "EARLIEST"):
+            earliest_content = self.file_io.read_file_utf8(self.snapshot_dir / 
"EARLIEST")
+            earliest_snapshot_id = int(earliest_content.strip())
+            return self.get_snapshot_by_id(earliest_snapshot_id)
+        else:
+            return self.get_snapshot_by_id(1)
+
+    def earlier_or_equal_time_mills(self, timestamp: int) -> 
Optional[Snapshot]:
+        """
+        Find the latest snapshot with time_millis <= the given timestamp.
+
+        Args:
+            timestamp: The timestamp to compare against
+
+        Returns:
+            The latest snapshot with time_millis <= timestamp, or None if no 
such snapshot exists
+        """
+        earliest = 1
+        latest = self.get_latest_snapshot().id
+        final_snapshot = None
+
+        while earliest <= latest:
+            mid = earliest + (latest - earliest) // 2
+            snapshot = self.get_snapshot_by_id(mid)
+            commit_time = snapshot.time_millis
+
+            if commit_time > timestamp:
+                latest = mid - 1
+            elif commit_time < timestamp:
+                earliest = mid + 1
+                final_snapshot = snapshot
+            else:
+                final_snapshot = snapshot
+                break
+
+        return final_snapshot
+
+    def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]:
+        """
+        Get a snapshot by its ID.
+
+        Args:
+            snapshot_id: The snapshot ID
+
+        Returns:
+            The snapshot with the specified ID, or None if not found
+        """
+        snapshot_file = self.get_snapshot_path(snapshot_id)
+        if not self.file_io.exists(snapshot_file):
+            return None
+
+        snapshot_content = self.file_io.read_file_utf8(snapshot_file)
+        return JSON.from_json(snapshot_content, Snapshot)
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index e0d7c7e603..57ccbf83bb 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -104,3 +104,20 @@ class FileStoreTable(Table):
             return DynamicBucketRowKeyExtractor(self.table_schema)
         else:
             raise ValueError(f"Unsupported bucket mode: {bucket_mode}")
+
+    def copy(self, options: dict) -> 'FileStoreTable':
+        if CoreOptions.BUCKET in options and options.get(CoreOptions.BUCKET) 
!= self.options.get(CoreOptions.BUCKET):
+            raise ValueError("Cannot change bucket number")
+        new_options = self.options.copy()
+        for k, v in options.items():
+            if v is None:
+                new_options.pop(k)
+            else:
+                new_options[k] = v
+        new_table_schema = self.table_schema.copy(new_options=new_options)
+        return FileStoreTable(self.file_io, self.identifier, self.table_path, 
new_table_schema,
+                              self.catalog_environment)
+
+    def add_options(self, options: dict):
+        for key, value in options.items():
+            self.options[key] = value
diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py 
b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
index d6b9433cba..530e33aa78 100644
--- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py
+++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py
@@ -19,13 +19,12 @@ import shutil
 import tempfile
 import unittest
 
+from pypaimon import CatalogFactory, Schema
 from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
                                                 DatabaseNotExistException,
                                                 TableAlreadyExistException,
                                                 TableNotExistException)
-from pypaimon import CatalogFactory
 from pypaimon.schema.data_types import AtomicType, DataField
-from pypaimon import Schema
 from pypaimon.table.file_store_table import FileStoreTable
 
 
diff --git a/paimon-python/pypaimon/tests/predicates_test.py 
b/paimon-python/pypaimon/tests/predicates_test.py
index 7976094290..9ab1cfbb37 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -23,8 +23,7 @@ import unittest
 import pandas as pd
 import pyarrow as pa
 
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
 
 
 def _check_filtered_result(read_builder, expected_df):
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py 
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
index 92a3e9601a..b06a69baa8 100644
--- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -22,12 +22,12 @@ import unittest
 import pandas as pd
 import pyarrow as pa
 
-from pypaimon import CatalogFactory
-from pypaimon import Schema
-from pypaimon.tests.predicates_test import _random_format, 
_check_filtered_result
+from pypaimon import CatalogFactory, Schema
+from pypaimon.tests.predicates_test import (_check_filtered_result,
+                                            _random_format)
 
 
-class PredicatePy36Test(unittest.TestCase):
+class AOPredicatePy36Test(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py 
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index e2a61df301..efb3189e06 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -20,8 +20,10 @@ from unittest.mock import patch
 import pyarrow as pa
 
 from pypaimon import Schema
-from pypaimon.catalog.catalog_exception import TableNotExistException, 
TableAlreadyExistException, \
-    DatabaseNotExistException, DatabaseAlreadyExistException
+from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException,
+                                                DatabaseNotExistException,
+                                                TableAlreadyExistException,
+                                                TableNotExistException)
 from pypaimon.common.config import OssOptions
 from pypaimon.common.file_io import FileIO
 from pypaimon.tests.py36.pyarrow_compat import table_sort_by
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index e6374132af..fe6902a5f5 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -16,34 +16,36 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 import logging
-from datetime import datetime, date
+import time
+from datetime import date, datetime
 from decimal import Decimal
 from unittest.mock import Mock
 
-import pandas as pd
 import numpy as np
+import pandas as pd
 import pyarrow as pa
 
+from pypaimon import CatalogFactory, Schema
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog_context import CatalogContext
-from pypaimon import CatalogFactory
 from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.identifier import Identifier
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.schema.data_types import DataField, AtomicType
-from pypaimon import Schema
-from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, 
GenericRowDeserializer
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer,
+                                            GenericRowSerializer)
 from pypaimon.table.row.row_kind import RowKind
 from pypaimon.tests.py36.pyarrow_compat import table_sort_by
 from pypaimon.tests.rest.rest_base_test import RESTBaseTest
-
 from pypaimon.write.file_store_commit import FileStoreCommit
 
 
-class RESTReadWritePy36Test(RESTBaseTest):
+class RESTAOReadWritePy36Test(RESTBaseTest):
 
     def test_overwrite(self):
         simple_pa_schema = pa.schema([
@@ -175,10 +177,10 @@ class RESTReadWritePy36Test(RESTBaseTest):
         self.assertEqual(actual_data, expect_data)
 
         # to test GenericRow ability
-        latest_snapshot = table_scan.snapshot_manager.get_latest_snapshot()
-        manifest_files = 
table_scan.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
table_scan.manifest_file_manager.read(manifest_files[0].file_name,
-                                                                 lambda row: 
table_scan._bucket_filter(row))
+        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._bucket_filter(row))
         min_value_stats = 
manifest_entries[0].file.value_stats.min_values.values
         max_value_stats = 
manifest_entries[0].file.value_stats.max_values.values
         expected_min_values = [col[0].as_py() for col in expect_data]
@@ -737,6 +739,33 @@ class RESTReadWritePy36Test(RESTBaseTest):
             test_name="specific_case"
         )
 
+    def test_incremental_timestamp(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.rest_catalog.create_table('default.test_incremental_parquet', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_incremental_parquet')
+        timestamp = int(time.time() * 1000)
+        self._write_test_table(table)
+
+        snapshot_manager = SnapshotManager(table)
+        t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+        t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+        # test 1
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
str(timestamp - 1) + ',' + str(timestamp)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(len(actual), 0)
+        # test 2
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
str(timestamp) + ',' + str(t2)})
+        read_builder = table.new_read_builder()
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        self.assertEqual(self.expected, actual)
+        # test 3
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) 
+ ',' + str(t2)})
+        read_builder = table.new_read_builder()
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        expected = self.expected.slice(4, 4)
+        self.assertEqual(expected, actual)
+
     def _test_value_stats_cols_case(self, manifest_manager, table, 
value_stats_cols, expected_fields_count, test_name):
         """Helper method to test a specific _VALUE_STATS_COLS case."""
 
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index db0cbcccd1..0367ab409c 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -18,14 +18,16 @@
 
 import os
 import tempfile
+import time
 import unittest
 
-import pyarrow as pa
 import numpy as np
 import pandas as pd
+import pyarrow as pa
 
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class AoReaderTest(unittest.TestCase):
@@ -277,6 +279,69 @@ class AoReaderTest(unittest.TestCase):
         # might be split of "dt=1" or split of "dt=2"
         self.assertEqual(actual.num_rows, 4)
 
+    def test_incremental_timestamp(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.catalog.create_table('default.test_incremental_parquet', schema, 
False)
+        table = self.catalog.get_table('default.test_incremental_parquet')
+        timestamp = int(time.time() * 1000)
+        self._write_test_table(table)
+
+        snapshot_manager = SnapshotManager(table)
+        t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+        t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+        # test 1
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
str(timestamp - 1) + ',' + str(timestamp)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(len(actual), 0)
+        # test 2
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
str(timestamp) + ',' + str(t2)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(self.expected, actual)
+        # test 3
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) 
+ ',' + str(t2)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        expected = self.expected.slice(4, 4)
+        self.assertEqual(expected, actual)
+
+    def test_incremental_read_multi_snapshots(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.catalog.create_table('default.test_incremental_100', schema, 
False)
+        table = self.catalog.get_table('default.test_incremental_100')
+
+        write_builder = table.new_batch_write_builder()
+        for i in range(1, 101):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            pa_table = pa.Table.from_pydict({
+                'user_id': [i],
+                'item_id': [1000 + i],
+                'behavior': [f'snap{i}'],
+                'dt': ['p1' if i % 2 == 1 else 'p2'],
+            }, schema=self.pa_schema)
+            table_write.write_arrow(pa_table)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        snapshot_manager = SnapshotManager(table)
+        t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
+        t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
+
+        table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
f"{t10},{t20}"})
+        read_builder = table_inc.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+
+        expected = pa.Table.from_pydict({
+            'user_id': list(range(11, 21)),
+            'item_id': [1000 + i for i in range(11, 21)],
+            'behavior': [f'snap{i}' for i in range(11, 21)],
+            'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)],
+        }, schema=self.pa_schema).sort_by('user_id')
+        self.assertEqual(expected, actual)
+
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
 
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 6e9dc1ffc6..6bb2cdd675 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -16,29 +16,28 @@
 # limitations under the License.
 
################################################################################
 
-import os
 import glob
+import os
 import shutil
 import tempfile
 import unittest
-from datetime import datetime, date, time
+from datetime import date, datetime, time
 from decimal import Decimal
 from unittest.mock import Mock
 
 import pandas as pd
 import pyarrow as pa
 
-from pypaimon.table.row.generic_row import GenericRow
-
-from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
-                                        MapType, PyarrowFieldParser)
-from pypaimon.schema.table_schema import TableSchema
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
-from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
+                                        MapType, PyarrowFieldParser)
+from pypaimon.schema.table_schema import TableSchema
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.write.file_store_commit import FileStoreCommit
 
 
@@ -215,10 +214,10 @@ class ReaderBasicTest(unittest.TestCase):
         self.assertEqual(actual_data, expect_data)
 
         # to test GenericRow ability
-        latest_snapshot = table_scan.snapshot_manager.get_latest_snapshot()
-        manifest_files = 
table_scan.manifest_list_manager.read_all(latest_snapshot)
-        manifest_entries = 
table_scan.manifest_file_manager.read(manifest_files[0].file_name,
-                                                                 lambda row: 
table_scan._bucket_filter(row))
+        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+            manifest_files[0].file_name, lambda row: 
table_scan.starting_scanner._bucket_filter(row))
         min_value_stats = 
manifest_entries[0].file.value_stats.min_values.values
         max_value_stats = 
manifest_entries[0].file.value_stats.max_values.values
         expected_min_values = [col[0].as_py() for col in expect_data]
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index b992595fc9..bcbc94bd68 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -19,12 +19,14 @@
 import os
 import shutil
 import tempfile
+import time
 import unittest
 
 import pyarrow as pa
 
-from pypaimon import CatalogFactory
-from pypaimon import Schema
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.snapshot.snapshot_manager import SnapshotManager
 
 
 class PkReaderTest(unittest.TestCase):
@@ -184,6 +186,79 @@ class PkReaderTest(unittest.TestCase):
         expected = self.expected.select(['dt', 'user_id', 'behavior'])
         self.assertEqual(actual, expected)
 
+    def test_incremental_timestamp(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={'bucket': '2'})
+        self.catalog.create_table('default.test_incremental_parquet', schema, 
False)
+        table = self.catalog.get_table('default.test_incremental_parquet')
+        timestamp = int(time.time() * 1000)
+        self._write_test_table(table)
+
+        snapshot_manager = SnapshotManager(table)
+        t1 = snapshot_manager.get_snapshot_by_id(1).time_millis
+        t2 = snapshot_manager.get_snapshot_by_id(2).time_millis
+        # test 1
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
str(timestamp - 1) + ',' + str(timestamp)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(len(actual), 0)
+        # test 2
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
str(timestamp) + ',' + str(t2)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(self.expected, actual)
+        # test 3
+        table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) 
+ ',' + str(t2)})
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        expected = pa.Table.from_pydict({
+            "user_id": [2, 5, 7, 8],
+            "item_id": [1002, 1005, 1007, 1008],
+            "behavior": ["b-new", "e", "g", "h"],
+            "dt": ["p1", "p2", "p1", "p2"]
+        }, schema=self.pa_schema)
+        self.assertEqual(expected, actual)
+
+    def test_incremental_read_multi_snapshots(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={'bucket': '2'})
+        
self.catalog.create_table('default.test_incremental_read_multi_snapshots', 
schema, False)
+        table = 
self.catalog.get_table('default.test_incremental_read_multi_snapshots')
+        write_builder = table.new_batch_write_builder()
+        for i in range(1, 101):
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+            pa_table = pa.Table.from_pydict({
+                'user_id': [i],
+                'item_id': [1000 + i],
+                'behavior': [f'snap{i}'],
+                'dt': ['p1' if i % 2 == 1 else 'p2'],
+            }, schema=self.pa_schema)
+            table_write.write_arrow(pa_table)
+            table_commit.commit(table_write.prepare_commit())
+            table_write.close()
+            table_commit.close()
+
+        snapshot_manager = SnapshotManager(table)
+        t10 = snapshot_manager.get_snapshot_by_id(10).time_millis
+        t20 = snapshot_manager.get_snapshot_by_id(20).time_millis
+
+        table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: 
f"{t10},{t20}"})
+        read_builder = table_inc.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+
+        expected = pa.Table.from_pydict({
+            'user_id': list(range(11, 21)),
+            'item_id': [1000 + i for i in range(11, 21)],
+            'behavior': [f'snap{i}' for i in range(11, 21)],
+            'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)],
+        }, schema=self.pa_schema).sort_by('user_id')
+        self.assertEqual(expected, actual)
+
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
 
diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py 
b/paimon-python/pypaimon/tests/rest/rest_base_test.py
index 3a83ccb285..e45fac5cde 100644
--- a/paimon-python/pypaimon/tests/rest/rest_base_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py
@@ -25,17 +25,16 @@ import uuid
 
 import pyarrow as pa
 
+from pypaimon import CatalogFactory, Schema
 from pypaimon.api.api_response import ConfigResponse
 from pypaimon.api.auth import BearTokenAuthProvider
 from pypaimon.api.options import Options
 from pypaimon.catalog.catalog_context import CatalogContext
-from pypaimon import CatalogFactory
 from pypaimon.catalog.rest.rest_catalog import RESTCatalog
 from pypaimon.catalog.rest.table_metadata import TableMetadata
 from pypaimon.common.identifier import Identifier
 from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField,
                                         MapType)
-from pypaimon import Schema
 from pypaimon.schema.table_schema import TableSchema
 from pypaimon.tests.rest.rest_server import RESTCatalogServer
 
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 10d2796b76..4e5b4d723e 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -27,7 +27,7 @@ from pypaimon.manifest.manifest_list_manager import 
ManifestListManager
 from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.read.table_scan import TableScan
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
                                                SnapshotCommit)
@@ -101,7 +101,7 @@ class FileStoreCommit:
                                        f"in {msg.partition} does not belong to 
this partition")
 
         commit_entries = []
-        current_entries = TableScan(self.table, partition_filter, None, 
[]).plan_files()
+        current_entries = FullStartingScanner(self.table, partition_filter, 
None, []).plan_files()
         for entry in current_entries:
             entry.kind = 1
             commit_entries.append(entry)

Reply via email to