This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 8bda95e1540396d7ec5d305b2041b5f6bc9877f0 Author: umi <[email protected]> AuthorDate: Fri Oct 17 10:03:44 2025 +0800 [Python] Introduce incremental-between read by timestamp (#6391) --- docs/content/program-api/python-api.md | 192 ++++++++- paimon-python/pypaimon/__init__.py | 2 +- paimon-python/pypaimon/common/core_options.py | 1 + paimon-python/pypaimon/filesystem/pvfs.py | 2 +- .../pypaimon/manifest/manifest_file_manager.py | 3 +- .../pypaimon/manifest/manifest_list_manager.py | 3 + paimon-python/pypaimon/read/scanner/__init__.py | 17 + .../read/scanner/empty_starting_scanner.py | 25 ++ .../full_starting_scanner.py} | 151 +------ .../read/scanner/incremental_starting_scanner.py | 77 ++++ .../pypaimon/read/scanner/starting_scanner.py | 28 ++ paimon-python/pypaimon/read/table_read.py | 2 +- paimon-python/pypaimon/read/table_scan.py | 463 ++------------------- .../pypaimon/snapshot/snapshot_manager.py | 55 +++ paimon-python/pypaimon/table/file_store_table.py | 17 + .../pypaimon/tests/filesystem_catalog_test.py | 3 +- paimon-python/pypaimon/tests/predicates_test.py | 3 +- .../pypaimon/tests/py36/ao_predicate_test.py | 8 +- .../pypaimon/tests/py36/ao_simple_test.py | 6 +- .../pypaimon/tests/py36/rest_ao_read_write_test.py | 53 ++- .../pypaimon/tests/reader_append_only_test.py | 71 +++- paimon-python/pypaimon/tests/reader_base_test.py | 27 +- .../pypaimon/tests/reader_primary_key_test.py | 79 +++- .../pypaimon/tests/rest/rest_base_test.py | 3 +- paimon-python/pypaimon/write/file_store_commit.py | 4 +- 25 files changed, 670 insertions(+), 625 deletions(-) diff --git a/docs/content/program-api/python-api.md b/docs/content/program-api/python-api.md index 6f7d8bb697..579fce109e 100644 --- a/docs/content/program-api/python-api.md +++ b/docs/content/program-api/python-api.md @@ -286,7 +286,8 @@ for batch in table_read.to_arrow_batch_reader(splits): ``` #### Python Iterator -You can read the data row by row into a native Python iterator. + +You can read the data row by row into a native Python iterator. This is convenient for custom row-based processing logic. ```python @@ -365,23 +366,177 @@ print(ray_dataset.to_pandas()) # ... ``` +### Incremental Read Between Timestamps + +This API allows reading data committed between two snapshot timestamps. The steps are as follows. + +- Set the option `CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP` on a copied table via `table.copy({...})`. The value must + be a string: `"startMillis,endMillis"`, where `startMillis` is exclusive and `endMillis` is inclusive. +- Use `SnapshotManager` to obtain snapshot timestamps or you can determine them by yourself. +- Read the data as above. + +Example: + +```python +from pypaimon import CatalogFactory +from pypaimon.common.core_options import CoreOptions +from pypaimon.snapshot.snapshot_manager import SnapshotManager + +# Prepare catalog and obtain a table +catalog = CatalogFactory.create({'warehouse': '/path/to/warehouse'}) +table = catalog.get_table('default.your_table_name') + +# Assume the table has at least two snapshots (1 and 2) +snapshot_manager = SnapshotManager(table) +t1 = snapshot_manager.get_snapshot_by_id(1).time_millis +t2 = snapshot_manager.get_snapshot_by_id(2).time_millis + +# Read records committed between [t1, t2] +table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t1},{t2}"}) + +read_builder = table_inc.new_read_builder() +table_scan = read_builder.new_scan() +table_read = read_builder.new_read() +splits = table_scan.plan().splits() + +# To Arrow +arrow_table = table_read.to_arrow(splits) + +# Or to pandas +pandas_df = table_read.to_pandas(splits) +``` + +### Shard Read + +Shard Read allows you to read data in parallel by dividing the table into multiple shards. This is useful for +distributed processing and parallel computation. + +You can specify the shard index and total number of shards to read a specific portion of the data: + +```python +# Prepare read builder +table = catalog.get_table('database_name.table_name') +read_builder = table.new_read_builder() +table_read = read_builder.new_read() + +# Read the second shard (index 1) out of 3 total shards +splits = read_builder.new_scan().with_shard(1, 3).plan().splits() + +# Read all shards and concatenate results +splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits() +splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits() +splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits() + +# Combine results from all shards + +all_splits = splits1 + splits2 + splits3 +pa_table = table_read.to_arrow(all_splits) +``` + +Example with shard read: + +```python +import pyarrow as pa +from pypaimon import CatalogFactory, Schema + +# Create catalog +catalog_options = {'warehouse': 'file:///path/to/warehouse'} +catalog = CatalogFactory.create(catalog_options) +catalog.create_database("default", False) +# Define schema +pa_schema = pa.schema([ + ('user_id', pa.int64()), + ('item_id', pa.int64()), + ('behavior', pa.string()), + ('dt', pa.string()), +]) + +# Create table and write data +schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt']) +catalog.create_table('default.test_table', schema, False) +table = catalog.get_table('default.test_table') + +# Write data in two batches +write_builder = table.new_batch_write_builder() + +# First write +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +data1 = { + 'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014], + 'behavior': ['a', 'b', 'c', None, 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'], + 'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1'], +} +pa_table = pa.Table.from_pydict(data1, schema=pa_schema) +table_write.write_arrow(pa_table) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# Second write +table_write = write_builder.new_write() +table_commit = write_builder.new_commit() +data2 = { + 'user_id': [5, 6, 7, 8, 18], + 'item_id': [1005, 1006, 1007, 1008, 1018], + 'behavior': ['e', 'f', 'g', 'h', 'z'], + 'dt': ['p2', 'p1', 'p2', 'p2', 'p1'], +} +pa_table = pa.Table.from_pydict(data2, schema=pa_schema) +table_write.write_arrow(pa_table) +table_commit.commit(table_write.prepare_commit()) +table_write.close() +table_commit.close() + +# Read specific shard +read_builder = table.new_read_builder() +table_read = read_builder.new_read() + +# Read shard 2 out of 3 total shards +splits = read_builder.new_scan().with_shard(2, 3).plan().splits() +shard_data = table_read.to_arrow(splits) + +# Verify shard distribution by reading all shards +splits1 = read_builder.new_scan().with_shard(0, 3).plan().splits() +splits2 = read_builder.new_scan().with_shard(1, 3).plan().splits() +splits3 = read_builder.new_scan().with_shard(2, 3).plan().splits() + +# Combine all shards should equal full table read +all_shards_data = pa.concat_tables([ + table_read.to_arrow(splits1), + table_read.to_arrow(splits2), + table_read.to_arrow(splits3), +]) +full_table_data = table_read.to_arrow(read_builder.new_scan().plan().splits()) +``` + +Key points about shard read: + +- **Shard Index**: Zero-based index of the shard to read (0 to total_shards-1) +- **Total Shards**: Total number of shards to divide the data into +- **Data Distribution**: Data is distributed evenly across shards, with remainder rows going to the last shard +- **Parallel Processing**: Each shard can be processed independently for better performance +- **Consistency**: Combining all shards should produce the complete table data + ## Data Types -| Python Native Type | PyArrow Type | Paimon Type | -| :--- | :--- | :--- | -| `int` | `pyarrow.int8()` | `TINYINT` | -| `int` | `pyarrow.int16()` | `SMALLINT` | -| `int` | `pyarrow.int32()` | `INT` | -| `int` | `pyarrow.int64()` | `BIGINT` | -| `float` | `pyarrow.float32()` | `FLOAT` | -| `float` | `pyarrow.float64()` | `DOUBLE` | -| `bool` | `pyarrow.bool_()` | `BOOLEAN` | -| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` | -| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` | -| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` | -| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` | -| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` | -| `datetime.date` | `pyarrow.date32()` | `DATE` | -| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` | + +| Python Native Type | PyArrow Type | Paimon Type | +|:--------------------|:-------------------------------------------------|:----------------------------------| +| `int` | `pyarrow.int8()` | `TINYINT` | +| `int` | `pyarrow.int16()` | `SMALLINT` | +| `int` | `pyarrow.int32()` | `INT` | +| `int` | `pyarrow.int64()` | `BIGINT` | +| `float` | `pyarrow.float32()` | `FLOAT` | +| `float` | `pyarrow.float64()` | `DOUBLE` | +| `bool` | `pyarrow.bool_()` | `BOOLEAN` | +| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` | +| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` | +| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` | +| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` | +| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` | +| `datetime.date` | `pyarrow.date32()` | `DATE` | +| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` | ## Predicate @@ -402,5 +557,4 @@ print(ray_dataset.to_pandas()) | f.contains(literal) | PredicateBuilder.contains(f, literal) | | f is in [l1, l2] | PredicateBuilder.is_in(f, [l1, l2]) | | f is not in [l1, l2] | PredicateBuilder.is_not_in(f, [l1, l2]) | -| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) | - +| lower <= f <= upper | PredicateBuilder.between(f, lower, upper) | \ No newline at end of file diff --git a/paimon-python/pypaimon/__init__.py b/paimon-python/pypaimon/__init__.py index 35a8a72ff1..fc4e7d3b61 100644 --- a/paimon-python/pypaimon/__init__.py +++ b/paimon-python/pypaimon/__init__.py @@ -15,8 +15,8 @@ # specific language governing permissions and limitations # under the License. -from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem from pypaimon.catalog.catalog_factory import CatalogFactory +from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem from pypaimon.schema.schema import Schema __version__ = "0.3.dev" diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index 754bb17c05..82b788438e 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -45,5 +45,6 @@ class CoreOptions(str, Enum): FILE_BLOCK_SIZE = "file.block-size" # Scan options SCAN_FALLBACK_BRANCH = "scan.fallback-branch" + INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp" # Commit options COMMIT_USER_PREFIX = "commit.user-prefix" diff --git a/paimon-python/pypaimon/filesystem/pvfs.py b/paimon-python/pypaimon/filesystem/pvfs.py index d7dabbbcaa..15f9f9f8dc 100644 --- a/paimon-python/pypaimon/filesystem/pvfs.py +++ b/paimon-python/pypaimon/filesystem/pvfs.py @@ -29,7 +29,7 @@ from fsspec import AbstractFileSystem from fsspec.implementations.local import LocalFileSystem from readerwriterlock import rwlock -from pypaimon.api.api_response import GetTableTokenResponse, GetTableResponse +from pypaimon.api.api_response import GetTableResponse, GetTableTokenResponse from pypaimon.api.client import AlreadyExistsException, NoSuchResourceException from pypaimon.api.rest_api import RESTApi from pypaimon.common.config import CatalogOptions, OssOptions, PVFSOptions diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index e3c9601cf4..9893b8659a 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -15,10 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -import fastavro from io import BytesIO from typing import List +import fastavro + from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, ManifestEntry) diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index 2fc1eea011..0fc58652f6 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -47,6 +47,9 @@ class ManifestListManager: manifest_files.extend(delta_manifests) return manifest_files + def read_delta(self, snapshot: Snapshot) -> List[ManifestFileMeta]: + return self.read(snapshot.delta_manifest_list) + def read(self, manifest_list_name: str) -> List[ManifestFileMeta]: manifest_files = [] diff --git a/paimon-python/pypaimon/read/scanner/__init__.py b/paimon-python/pypaimon/read/scanner/__init__.py new file mode 100644 index 0000000000..53ed4d36c2 --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/__init__.py @@ -0,0 +1,17 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" diff --git a/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py new file mode 100644 index 0000000000..9c870aa588 --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/empty_starting_scanner.py @@ -0,0 +1,25 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from pypaimon.read.plan import Plan +from pypaimon.read.scanner.starting_scanner import StartingScanner + + +class EmptyStartingScanner(StartingScanner): + + def scan(self) -> Plan: + return Plan([]) diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py similarity index 72% copy from paimon-python/pypaimon/read/table_scan.py copy to paimon-python/pypaimon/read/scanner/full_starting_scanner.py index d76725ca97..4275b3f3e2 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -1,21 +1,20 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" from collections import defaultdict from typing import Callable, List, Optional @@ -29,17 +28,15 @@ from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.plan import Plan from pypaimon.read.push_down_utils import (extract_predicate_to_dict, extract_predicate_to_list) +from pypaimon.read.scanner.starting_scanner import StartingScanner from pypaimon.read.split import Split from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.bucket_mode import BucketMode -class TableScan: - """Implementation of TableScan for native Python reading.""" - - def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], - read_type: List[DataField]): +class FullStartingScanner(StartingScanner): + def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], read_type: List[DataField]): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -68,16 +65,13 @@ class TableScan: self.only_read_real_buckets = True if int( self.table.options.get('bucket', -1)) == BucketMode.POSTPONE_BUCKET.value else False - self.data_evolution = self.table.options.get('data-evolution.enabled', 'false').lower() == 'true' - def plan(self) -> Plan: + def scan(self) -> Plan: file_entries = self.plan_files() if not file_entries: return Plan([]) if self.table.is_primary_key_table: splits = self._create_primary_key_splits(file_entries) - elif self.data_evolution: - splits = self._create_data_evolution_splits(file_entries) else: splits = self._create_append_only_splits(file_entries) @@ -256,48 +250,6 @@ class TableScan: "row_count": file_entry.file.row_count, }) - def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: - """ - Create data evolution splits for append-only tables with schema evolution. - This method groups files by firstRowId and creates splits that can handle - column merging across different schema versions. - """ - partitioned_files = defaultdict(list) - for entry in file_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - if self.idx_of_this_subtask is not None: - partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) - - def weight_func(file_list: List[DataFileMeta]) -> int: - return max(sum(f.file_size for f in file_list), self.open_file_cost) - - splits = [] - for key, file_entries in partitioned_files.items(): - if not file_entries: - continue - - data_files: List[DataFileMeta] = [e.file for e in file_entries] - - # Split files by firstRowId for data evolution - split_by_row_id = self._split_by_row_id(data_files) - - # Pack the split groups for optimal split sizes - packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, - self.target_split_size) - - # Flatten the packed files and build splits - flatten_packed_files: List[List[DataFileMeta]] = [ - [file for sub_pack in pack for file in sub_pack] - for pack in packed_files - ] - - splits += self._build_split_from_pack(flatten_packed_files, file_entries, False) - - if self.idx_of_this_subtask is not None: - self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) - return splits - def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: partitioned_files = defaultdict(list) for entry in file_entries: @@ -405,64 +357,3 @@ class TableScan: packed.append(bin_items) return packed - - @staticmethod - def _is_blob_file(file_name: str) -> bool: - """Check if a file is a blob file based on its extension.""" - return file_name.endswith('.blob') - - def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: - """ - Split files by firstRowId for data evolution. - This method groups files that have the same firstRowId, which is essential - for handling schema evolution where files with different schemas need to be - read together to merge columns. - """ - split_by_row_id = [] - - # Sort files by firstRowId and then by maxSequenceNumber - # Files with null firstRowId are treated as having Long.MIN_VALUE - def sort_key(file: DataFileMeta) -> tuple: - first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf') - is_blob = 1 if self._is_blob_file(file.file_name) else 0 - # For files with same firstRowId, sort by maxSequenceNumber in descending order - # (larger sequence number means more recent data) - max_seq = file.max_sequence_number - return (first_row_id, is_blob, -max_seq) - - sorted_files = sorted(files, key=sort_key) - - # Split files by firstRowId - last_row_id = -1 - check_row_id_start = 0 - current_split = [] - - for file in sorted_files: - first_row_id = file.first_row_id - if first_row_id is None: - # Files without firstRowId are treated as individual splits - split_by_row_id.append([file]) - continue - - if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: - if current_split: - split_by_row_id.append(current_split) - - # Validate that files don't overlap - if first_row_id < check_row_id_start: - file_names = [f.file_name for f in sorted_files] - raise ValueError( - f"There are overlapping files in the split: {file_names}, " - f"the wrong file is: {file.file_name}" - ) - - current_split = [] - last_row_id = first_row_id - check_row_id_start = first_row_id + file.row_count - - current_split.append(file) - - if current_split: - split_by_row_id.append(current_split) - - return split_by_row_id diff --git a/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py new file mode 100644 index 0000000000..f1a9cd03bc --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/incremental_starting_scanner.py @@ -0,0 +1,77 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from typing import List, Optional + +from pypaimon.common.predicate import Predicate +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner +from pypaimon.schema.data_types import DataField +from pypaimon.snapshot.snapshot_manager import SnapshotManager + + +class IncrementalStartingScanner(FullStartingScanner): + def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int], + read_type: List[DataField], start: int, end: int): + super().__init__(table, predicate, limit, read_type) + self.startingSnapshotId = start + self.endingSnapshotId = end + + def plan_files(self) -> List[ManifestEntry]: + snapshots_in_range = [] + for snapshot_id in range(self.startingSnapshotId + 1, self.endingSnapshotId + 1): + snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) + if snapshot.commit_kind == "APPEND": + snapshots_in_range.append(snapshot) + + # Collect all file entries from all snapshots in range + file_entries = [] + + for snapshot in snapshots_in_range: + # Get manifest files for this snapshot + manifest_files = self.manifest_list_manager.read_delta(snapshot) + + # Read all entries from manifest files + for manifest_file in manifest_files: + entries = self.manifest_file_manager.read(manifest_file.file_name) + file_entries.extend(entries) + if self.predicate: + file_entries = self._filter_by_predicate(file_entries) + return file_entries + + @staticmethod + def between_timestamps(table, predicate: Optional[Predicate], limit: Optional[int], + read_type: List[DataField], start_timestamp: int, + end_timestamp: int) -> 'IncrementalStartingScanner': + """ + Create an IncrementalStartingScanner for snapshots between two timestamps. + """ + snapshot_manager = SnapshotManager(table) + starting_snapshot = snapshot_manager.earlier_or_equal_time_mills(start_timestamp) + earliest_snapshot = snapshot_manager.try_get_earliest_snapshot() + + # If earliest_snapshot.time_millis > start_timestamp we should include the earliest_snapshot + if starting_snapshot is None or (earliest_snapshot and earliest_snapshot.time_millis > start_timestamp): + start_id = earliest_snapshot.id - 1 if earliest_snapshot else -1 + else: + start_id = starting_snapshot.id + + end_snapshot = snapshot_manager.earlier_or_equal_time_mills(end_timestamp) + latest_snapshot = snapshot_manager.get_latest_snapshot() + end_id = end_snapshot.id if end_snapshot else (latest_snapshot.id if latest_snapshot else -1) + + return IncrementalStartingScanner(table, predicate, limit, read_type, start_id, end_id) diff --git a/paimon-python/pypaimon/read/scanner/starting_scanner.py b/paimon-python/pypaimon/read/scanner/starting_scanner.py new file mode 100644 index 0000000000..7e6cdfd81a --- /dev/null +++ b/paimon-python/pypaimon/read/scanner/starting_scanner.py @@ -0,0 +1,28 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from abc import ABC, abstractmethod + +from pypaimon.read.plan import Plan + + +class StartingScanner(ABC): + """Helper class for the first planning of TableScan.""" + + @abstractmethod + def scan(self) -> Plan: + """Plan the files to read.""" diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index b33fb2c6ad..cf32d04d72 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from typing import Iterator, List, Optional, Any +from typing import Any, Iterator, List, Optional import pandas import pyarrow diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index d76725ca97..93a7babb30 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -16,23 +16,19 @@ # limitations under the License. ################################################################################ -from collections import defaultdict -from typing import Callable, List, Optional +from typing import List, Optional +from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate -from pypaimon.common.predicate_builder import PredicateBuilder -from pypaimon.manifest.manifest_file_manager import ManifestFileManager -from pypaimon.manifest.manifest_list_manager import ManifestListManager -from pypaimon.manifest.schema.data_file_meta import DataFileMeta -from pypaimon.manifest.schema.manifest_entry import ManifestEntry -from pypaimon.read.interval_partition import IntervalPartition, SortedRun + from pypaimon.read.plan import Plan -from pypaimon.read.push_down_utils import (extract_predicate_to_dict, - extract_predicate_to_list) -from pypaimon.read.split import Split +from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner +from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner +from pypaimon.read.scanner.incremental_starting_scanner import \ + IncrementalStartingScanner +from pypaimon.read.scanner.starting_scanner import StartingScanner from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager -from pypaimon.table.bucket_mode import BucketMode class TableScan: @@ -46,423 +42,36 @@ class TableScan: self.predicate = predicate self.limit = limit self.read_type = read_type - - self.snapshot_manager = SnapshotManager(table) - self.manifest_list_manager = ManifestListManager(table) - self.manifest_file_manager = ManifestFileManager(table) - - pk_conditions = [] - trimmed_pk = [field.name for field in self.table.table_schema.get_trimmed_primary_key_fields()] - extract_predicate_to_list(pk_conditions, self.predicate, trimmed_pk) - self.primary_key_predicate = PredicateBuilder(self.table.fields).and_predicates(pk_conditions) - - partition_conditions = defaultdict(list) - extract_predicate_to_dict(partition_conditions, self.predicate, self.table.partition_keys) - self.partition_key_predicate = partition_conditions - - self.target_split_size = 128 * 1024 * 1024 - self.open_file_cost = 4 * 1024 * 1024 - - self.idx_of_this_subtask = None - self.number_of_para_subtasks = None - - self.only_read_real_buckets = True if int( - self.table.options.get('bucket', -1)) == BucketMode.POSTPONE_BUCKET.value else False - self.data_evolution = self.table.options.get('data-evolution.enabled', 'false').lower() == 'true' + self.starting_scanner = self._create_starting_scanner() def plan(self) -> Plan: - file_entries = self.plan_files() - if not file_entries: - return Plan([]) - if self.table.is_primary_key_table: - splits = self._create_primary_key_splits(file_entries) - elif self.data_evolution: - splits = self._create_data_evolution_splits(file_entries) - else: - splits = self._create_append_only_splits(file_entries) - - splits = self._apply_push_down_limit(splits) - return Plan(splits) - - def plan_files(self) -> List[ManifestEntry]: - latest_snapshot = self.snapshot_manager.get_latest_snapshot() - if not latest_snapshot: - return [] - manifest_files = self.manifest_list_manager.read_all(latest_snapshot) - - deleted_entries = set() - added_entries = [] - # TODO: filter manifest files by predicate - for manifest_file in manifest_files: - manifest_entries = self.manifest_file_manager.read(manifest_file.file_name, - lambda row: self._bucket_filter(row)) - for entry in manifest_entries: - if entry.kind == 0: - added_entries.append(entry) - else: - deleted_entries.add((tuple(entry.partition.values), entry.bucket, entry.file.file_name)) - - file_entries = [ - entry for entry in added_entries - if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries - ] - if self.predicate: - file_entries = self._filter_by_predicate(file_entries) - return file_entries + return self.starting_scanner.scan() + + def _create_starting_scanner(self) -> Optional[StartingScanner]: + options = self.table.options + if CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP in options: + ts = options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP].split(",") + if len(ts) != 2: + raise ValueError( + "The incremental-between-timestamp must specific start(exclusive) and end timestamp. But is: " + + options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP]) + earliest_snapshot = SnapshotManager(self.table).try_get_earliest_snapshot() + latest_snapshot = SnapshotManager(self.table).get_latest_snapshot() + if earliest_snapshot is None or latest_snapshot is None: + return EmptyStartingScanner() + start_timestamp = int(ts[0]) + end_timestamp = int(ts[1]) + if start_timestamp >= end_timestamp: + raise ValueError( + "Ending timestamp %s should be >= starting timestamp %s." % (end_timestamp, start_timestamp)) + if (start_timestamp == end_timestamp or start_timestamp > latest_snapshot.time_millis + or end_timestamp < earliest_snapshot.time_millis): + return EmptyStartingScanner() + return IncrementalStartingScanner.between_timestamps(self.table, self.predicate, self.limit, self.read_type, + start_timestamp, + end_timestamp) + return FullStartingScanner(self.table, self.predicate, self.limit, self.read_type) def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': - if idx_of_this_subtask >= number_of_para_subtasks: - raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") - self.idx_of_this_subtask = idx_of_this_subtask - self.number_of_para_subtasks = number_of_para_subtasks + self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks) return self - - def _append_only_filter_by_shard(self, partitioned_files: defaultdict) -> (defaultdict, int, int): - total_row = 0 - # Sort by file creation time to ensure consistent sharding - for key, file_entries in partitioned_files.items(): - for entry in file_entries: - total_row += entry.file.row_count - - # Calculate number of rows this shard should process - # Last shard handles all remaining rows (handles non-divisible cases) - if self.idx_of_this_subtask == self.number_of_para_subtasks - 1: - num_row = total_row - total_row // self.number_of_para_subtasks * self.idx_of_this_subtask - else: - num_row = total_row // self.number_of_para_subtasks - # Calculate start row and end row position for current shard in all data - start_row = self.idx_of_this_subtask * (total_row // self.number_of_para_subtasks) - end_row = start_row + num_row - - plan_start_row = 0 - plan_end_row = 0 - entry_end_row = 0 # end row position of current file in all data - splits_start_row = 0 - filtered_partitioned_files = defaultdict(list) - # Iterate through all file entries to find files that overlap with current shard range - for key, file_entries in partitioned_files.items(): - filtered_entries = [] - for entry in file_entries: - entry_begin_row = entry_end_row # Starting row position of current file in all data - entry_end_row += entry.file.row_count # Update to row position after current file - - # If current file is completely after shard range, stop iteration - if entry_begin_row >= end_row: - break - # If current file is completely before shard range, skip it - if entry_end_row <= start_row: - continue - if entry_begin_row <= start_row < entry_end_row: - splits_start_row = entry_begin_row - plan_start_row = start_row - entry_begin_row - # If shard end position is within current file, record relative end position - if entry_begin_row < end_row <= entry_end_row: - plan_end_row = end_row - splits_start_row - # Add files that overlap with shard range to result - filtered_entries.append(entry) - if filtered_entries: - filtered_partitioned_files[key] = filtered_entries - - return filtered_partitioned_files, plan_start_row, plan_end_row - - def _compute_split_start_end_row(self, splits: List[Split], plan_start_row, plan_end_row): - file_end_row = 0 # end row position of current file in all data - for split in splits: - files = split.files - split_start_row = file_end_row - # Iterate through all file entries to find files that overlap with current shard range - for file in files: - file_begin_row = file_end_row # Starting row position of current file in all data - file_end_row += file.row_count # Update to row position after current file - - # If shard start position is within current file, record actual start position and relative offset - if file_begin_row <= plan_start_row < file_end_row: - split.split_start_row = plan_start_row - file_begin_row - - # If shard end position is within current file, record relative end position - if file_begin_row < plan_end_row <= file_end_row: - split.split_end_row = plan_end_row - split_start_row - if split.split_start_row is None: - split.split_start_row = 0 - if split.split_end_row is None: - split.split_end_row = split.row_count - - def _primary_key_filter_by_shard(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]: - filtered_entries = [] - for entry in file_entries: - if entry.bucket % self.number_of_para_subtasks == self.idx_of_this_subtask: - filtered_entries.append(entry) - return filtered_entries - - def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool: - bucket = entry.bucket - if self.only_read_real_buckets and bucket < 0: - return False - return True - - def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]: - if self.limit is None: - return splits - scanned_row_count = 0 - limited_splits = [] - - for split in splits: - if split.raw_convertible: - limited_splits.append(split) - scanned_row_count += split.row_count - if scanned_row_count >= self.limit: - return limited_splits - - return limited_splits - - def _filter_by_predicate(self, file_entries: List[ManifestEntry]) -> List[ManifestEntry]: - if not self.predicate: - return file_entries - - filtered_files = [] - for file_entry in file_entries: - if self.partition_key_predicate and not self._filter_by_partition(file_entry): - continue - if not self._filter_by_stats(file_entry): - continue - filtered_files.append(file_entry) - - return filtered_files - - def _filter_by_partition(self, file_entry: ManifestEntry) -> bool: - partition_dict = file_entry.partition.to_dict() - for field_name, conditions in self.partition_key_predicate.items(): - partition_value = partition_dict[field_name] - for predicate in conditions: - if not predicate.test_by_value(partition_value): - return False - return True - - def _filter_by_stats(self, file_entry: ManifestEntry) -> bool: - if file_entry.kind != 0: - return False - if self.table.is_primary_key_table: - predicate = self.primary_key_predicate - stats = file_entry.file.key_stats - else: - predicate = self.predicate - stats = file_entry.file.value_stats - return predicate.test_by_stats({ - "min_values": stats.min_values.to_dict(), - "max_values": stats.max_values.to_dict(), - "null_counts": { - stats.min_values.fields[i].name: stats.null_counts[i] for i in range(len(stats.min_values.fields)) - }, - "row_count": file_entry.file.row_count, - }) - - def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: - """ - Create data evolution splits for append-only tables with schema evolution. - This method groups files by firstRowId and creates splits that can handle - column merging across different schema versions. - """ - partitioned_files = defaultdict(list) - for entry in file_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - if self.idx_of_this_subtask is not None: - partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) - - def weight_func(file_list: List[DataFileMeta]) -> int: - return max(sum(f.file_size for f in file_list), self.open_file_cost) - - splits = [] - for key, file_entries in partitioned_files.items(): - if not file_entries: - continue - - data_files: List[DataFileMeta] = [e.file for e in file_entries] - - # Split files by firstRowId for data evolution - split_by_row_id = self._split_by_row_id(data_files) - - # Pack the split groups for optimal split sizes - packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, - self.target_split_size) - - # Flatten the packed files and build splits - flatten_packed_files: List[List[DataFileMeta]] = [ - [file for sub_pack in pack for file in sub_pack] - for pack in packed_files - ] - - splits += self._build_split_from_pack(flatten_packed_files, file_entries, False) - - if self.idx_of_this_subtask is not None: - self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) - return splits - - def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: - partitioned_files = defaultdict(list) - for entry in file_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - if self.idx_of_this_subtask is not None: - partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) - - def weight_func(f: DataFileMeta) -> int: - return max(f.file_size, self.open_file_cost) - - splits = [] - for key, file_entries in partitioned_files.items(): - if not file_entries: - return [] - - data_files: List[DataFileMeta] = [e.file for e in file_entries] - - packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(data_files, weight_func, - self.target_split_size) - splits += self._build_split_from_pack(packed_files, file_entries, False) - if self.idx_of_this_subtask is not None: - self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) - return splits - - def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: - if self.idx_of_this_subtask is not None: - file_entries = self._primary_key_filter_by_shard(file_entries) - partitioned_files = defaultdict(list) - for entry in file_entries: - partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) - - def weight_func(fl: List[DataFileMeta]) -> int: - return max(sum(f.file_size for f in fl), self.open_file_cost) - - splits = [] - for key, file_entries in partitioned_files.items(): - if not file_entries: - return [] - - data_files: List[DataFileMeta] = [e.file for e in file_entries] - partition_sort_runs: List[List[SortedRun]] = IntervalPartition(data_files).partition() - sections: List[List[DataFileMeta]] = [ - [file for s in sl for file in s.files] - for sl in partition_sort_runs - ] - - packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(sections, weight_func, - self.target_split_size) - flatten_packed_files: List[List[DataFileMeta]] = [ - [file for sub_pack in pack for file in sub_pack] - for pack in packed_files - ] - splits += self._build_split_from_pack(flatten_packed_files, file_entries, True) - return splits - - def _build_split_from_pack(self, packed_files, file_entries, for_primary_key_split: bool) -> List['Split']: - splits = [] - for file_group in packed_files: - raw_convertible = True - if for_primary_key_split: - raw_convertible = len(file_group) == 1 - - file_paths = [] - total_file_size = 0 - total_record_count = 0 - - for data_file in file_group: - data_file.set_file_path(self.table.table_path, file_entries[0].partition, - file_entries[0].bucket) - file_paths.append(data_file.file_path) - total_file_size += data_file.file_size - total_record_count += data_file.row_count - - if file_paths: - split = Split( - files=file_group, - partition=file_entries[0].partition, - bucket=file_entries[0].bucket, - _file_paths=file_paths, - _row_count=total_record_count, - _file_size=total_file_size, - raw_convertible=raw_convertible - ) - splits.append(split) - return splits - - @staticmethod - def _pack_for_ordered(items: List, weight_func: Callable, target_weight: int) -> List[List]: - packed = [] - bin_items = [] - bin_weight = 0 - - for item in items: - weight = weight_func(item) - if bin_weight + weight > target_weight and len(bin_items) > 0: - packed.append(list(bin_items)) - bin_items.clear() - bin_weight = 0 - - bin_weight += weight - bin_items.append(item) - - if len(bin_items) > 0: - packed.append(bin_items) - - return packed - - @staticmethod - def _is_blob_file(file_name: str) -> bool: - """Check if a file is a blob file based on its extension.""" - return file_name.endswith('.blob') - - def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: - """ - Split files by firstRowId for data evolution. - This method groups files that have the same firstRowId, which is essential - for handling schema evolution where files with different schemas need to be - read together to merge columns. - """ - split_by_row_id = [] - - # Sort files by firstRowId and then by maxSequenceNumber - # Files with null firstRowId are treated as having Long.MIN_VALUE - def sort_key(file: DataFileMeta) -> tuple: - first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf') - is_blob = 1 if self._is_blob_file(file.file_name) else 0 - # For files with same firstRowId, sort by maxSequenceNumber in descending order - # (larger sequence number means more recent data) - max_seq = file.max_sequence_number - return (first_row_id, is_blob, -max_seq) - - sorted_files = sorted(files, key=sort_key) - - # Split files by firstRowId - last_row_id = -1 - check_row_id_start = 0 - current_split = [] - - for file in sorted_files: - first_row_id = file.first_row_id - if first_row_id is None: - # Files without firstRowId are treated as individual splits - split_by_row_id.append([file]) - continue - - if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: - if current_split: - split_by_row_id.append(current_split) - - # Validate that files don't overlap - if first_row_id < check_row_id_start: - file_names = [f.file_name for f in sorted_files] - raise ValueError( - f"There are overlapping files in the split: {file_names}, " - f"the wrong file is: {file.file_name}" - ) - - current_split = [] - last_row_id = first_row_id - check_row_id_start = first_row_id + file.row_count - - current_split.append(file) - - if current_split: - split_by_row_id.append(current_split) - - return split_by_row_id diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py b/paimon-python/pypaimon/snapshot/snapshot_manager.py index 2ded357802..87b42c6f35 100644 --- a/paimon-python/pypaimon/snapshot/snapshot_manager.py +++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py @@ -59,3 +59,58 @@ class SnapshotManager: Path to the snapshot file """ return self.snapshot_dir / f"snapshot-{snapshot_id}" + + def try_get_earliest_snapshot(self) -> Optional[Snapshot]: + if self.file_io.exists(self.snapshot_dir / "EARLIEST"): + earliest_content = self.file_io.read_file_utf8(self.snapshot_dir / "EARLIEST") + earliest_snapshot_id = int(earliest_content.strip()) + return self.get_snapshot_by_id(earliest_snapshot_id) + else: + return self.get_snapshot_by_id(1) + + def earlier_or_equal_time_mills(self, timestamp: int) -> Optional[Snapshot]: + """ + Find the latest snapshot with time_millis <= the given timestamp. + + Args: + timestamp: The timestamp to compare against + + Returns: + The latest snapshot with time_millis <= timestamp, or None if no such snapshot exists + """ + earliest = 1 + latest = self.get_latest_snapshot().id + final_snapshot = None + + while earliest <= latest: + mid = earliest + (latest - earliest) // 2 + snapshot = self.get_snapshot_by_id(mid) + commit_time = snapshot.time_millis + + if commit_time > timestamp: + latest = mid - 1 + elif commit_time < timestamp: + earliest = mid + 1 + final_snapshot = snapshot + else: + final_snapshot = snapshot + break + + return final_snapshot + + def get_snapshot_by_id(self, snapshot_id: int) -> Optional[Snapshot]: + """ + Get a snapshot by its ID. + + Args: + snapshot_id: The snapshot ID + + Returns: + The snapshot with the specified ID, or None if not found + """ + snapshot_file = self.get_snapshot_path(snapshot_id) + if not self.file_io.exists(snapshot_file): + return None + + snapshot_content = self.file_io.read_file_utf8(snapshot_file) + return JSON.from_json(snapshot_content, Snapshot) diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index e0d7c7e603..57ccbf83bb 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -104,3 +104,20 @@ class FileStoreTable(Table): return DynamicBucketRowKeyExtractor(self.table_schema) else: raise ValueError(f"Unsupported bucket mode: {bucket_mode}") + + def copy(self, options: dict) -> 'FileStoreTable': + if CoreOptions.BUCKET in options and options.get(CoreOptions.BUCKET) != self.options.get(CoreOptions.BUCKET): + raise ValueError("Cannot change bucket number") + new_options = self.options.copy() + for k, v in options.items(): + if v is None: + new_options.pop(k) + else: + new_options[k] = v + new_table_schema = self.table_schema.copy(new_options=new_options) + return FileStoreTable(self.file_io, self.identifier, self.table_path, new_table_schema, + self.catalog_environment) + + def add_options(self, options: dict): + for key, value in options.items(): + self.options[key] = value diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index d6b9433cba..530e33aa78 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -19,13 +19,12 @@ import shutil import tempfile import unittest +from pypaimon import CatalogFactory, Schema from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException, DatabaseNotExistException, TableAlreadyExistException, TableNotExistException) -from pypaimon import CatalogFactory from pypaimon.schema.data_types import AtomicType, DataField -from pypaimon import Schema from pypaimon.table.file_store_table import FileStoreTable diff --git a/paimon-python/pypaimon/tests/predicates_test.py b/paimon-python/pypaimon/tests/predicates_test.py index 7976094290..9ab1cfbb37 100644 --- a/paimon-python/pypaimon/tests/predicates_test.py +++ b/paimon-python/pypaimon/tests/predicates_test.py @@ -23,8 +23,7 @@ import unittest import pandas as pd import pyarrow as pa -from pypaimon import CatalogFactory -from pypaimon import Schema +from pypaimon import CatalogFactory, Schema def _check_filtered_result(read_builder, expected_df): diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py index 92a3e9601a..b06a69baa8 100644 --- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py @@ -22,12 +22,12 @@ import unittest import pandas as pd import pyarrow as pa -from pypaimon import CatalogFactory -from pypaimon import Schema -from pypaimon.tests.predicates_test import _random_format, _check_filtered_result +from pypaimon import CatalogFactory, Schema +from pypaimon.tests.predicates_test import (_check_filtered_result, + _random_format) -class PredicatePy36Test(unittest.TestCase): +class AOPredicatePy36Test(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py b/paimon-python/pypaimon/tests/py36/ao_simple_test.py index e2a61df301..efb3189e06 100644 --- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py +++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py @@ -20,8 +20,10 @@ from unittest.mock import patch import pyarrow as pa from pypaimon import Schema -from pypaimon.catalog.catalog_exception import TableNotExistException, TableAlreadyExistException, \ - DatabaseNotExistException, DatabaseAlreadyExistException +from pypaimon.catalog.catalog_exception import (DatabaseAlreadyExistException, + DatabaseNotExistException, + TableAlreadyExistException, + TableNotExistException) from pypaimon.common.config import OssOptions from pypaimon.common.file_io import FileIO from pypaimon.tests.py36.pyarrow_compat import table_sort_by diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index e6374132af..fe6902a5f5 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -16,34 +16,36 @@ See the License for the specific language governing permissions and limitations under the License. """ import logging -from datetime import datetime, date +import time +from datetime import date, datetime from decimal import Decimal from unittest.mock import Mock -import pandas as pd import numpy as np +import pandas as pd import pyarrow as pa +from pypaimon import CatalogFactory, Schema from pypaimon.api.options import Options from pypaimon.catalog.catalog_context import CatalogContext -from pypaimon import CatalogFactory from pypaimon.catalog.rest.rest_catalog import RESTCatalog +from pypaimon.common.core_options import CoreOptions from pypaimon.common.identifier import Identifier from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.simple_stats import SimpleStats -from pypaimon.schema.data_types import DataField, AtomicType -from pypaimon import Schema -from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer, GenericRowDeserializer +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.row.generic_row import (GenericRow, GenericRowDeserializer, + GenericRowSerializer) from pypaimon.table.row.row_kind import RowKind from pypaimon.tests.py36.pyarrow_compat import table_sort_by from pypaimon.tests.rest.rest_base_test import RESTBaseTest - from pypaimon.write.file_store_commit import FileStoreCommit -class RESTReadWritePy36Test(RESTBaseTest): +class RESTAOReadWritePy36Test(RESTBaseTest): def test_overwrite(self): simple_pa_schema = pa.schema([ @@ -175,10 +177,10 @@ class RESTReadWritePy36Test(RESTBaseTest): self.assertEqual(actual_data, expect_data) # to test GenericRow ability - latest_snapshot = table_scan.snapshot_manager.get_latest_snapshot() - manifest_files = table_scan.manifest_list_manager.read_all(latest_snapshot) - manifest_entries = table_scan.manifest_file_manager.read(manifest_files[0].file_name, - lambda row: table_scan._bucket_filter(row)) + latest_snapshot = SnapshotManager(table).get_latest_snapshot() + manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( + manifest_files[0].file_name, lambda row: table_scan.starting_scanner._bucket_filter(row)) min_value_stats = manifest_entries[0].file.value_stats.min_values.values max_value_stats = manifest_entries[0].file.value_stats.max_values.values expected_min_values = [col[0].as_py() for col in expect_data] @@ -737,6 +739,33 @@ class RESTReadWritePy36Test(RESTBaseTest): test_name="specific_case" ) + def test_incremental_timestamp(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.rest_catalog.create_table('default.test_incremental_parquet', schema, False) + table = self.rest_catalog.get_table('default.test_incremental_parquet') + timestamp = int(time.time() * 1000) + self._write_test_table(table) + + snapshot_manager = SnapshotManager(table) + t1 = snapshot_manager.get_snapshot_by_id(1).time_millis + t2 = snapshot_manager.get_snapshot_by_id(2).time_millis + # test 1 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp - 1) + ',' + str(timestamp)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder) + self.assertEqual(len(actual), 0) + # test 2 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp) + ',' + str(t2)}) + read_builder = table.new_read_builder() + actual = table_sort_by(self._read_test_table(read_builder), 'user_id') + self.assertEqual(self.expected, actual) + # test 3 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) + ',' + str(t2)}) + read_builder = table.new_read_builder() + actual = table_sort_by(self._read_test_table(read_builder), 'user_id') + expected = self.expected.slice(4, 4) + self.assertEqual(expected, actual) + def _test_value_stats_cols_case(self, manifest_manager, table, value_stats_cols, expected_fields_count, test_name): """Helper method to test a specific _VALUE_STATS_COLS case.""" diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index db0cbcccd1..0367ab409c 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -18,14 +18,16 @@ import os import tempfile +import time import unittest -import pyarrow as pa import numpy as np import pandas as pd +import pyarrow as pa -from pypaimon import CatalogFactory -from pypaimon import Schema +from pypaimon import CatalogFactory, Schema +from pypaimon.common.core_options import CoreOptions +from pypaimon.snapshot.snapshot_manager import SnapshotManager class AoReaderTest(unittest.TestCase): @@ -277,6 +279,69 @@ class AoReaderTest(unittest.TestCase): # might be split of "dt=1" or split of "dt=2" self.assertEqual(actual.num_rows, 4) + def test_incremental_timestamp(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_incremental_parquet', schema, False) + table = self.catalog.get_table('default.test_incremental_parquet') + timestamp = int(time.time() * 1000) + self._write_test_table(table) + + snapshot_manager = SnapshotManager(table) + t1 = snapshot_manager.get_snapshot_by_id(1).time_millis + t2 = snapshot_manager.get_snapshot_by_id(2).time_millis + # test 1 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp - 1) + ',' + str(timestamp)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder) + self.assertEqual(len(actual), 0) + # test 2 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp) + ',' + str(t2)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + self.assertEqual(self.expected, actual) + # test 3 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) + ',' + str(t2)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + expected = self.expected.slice(4, 4) + self.assertEqual(expected, actual) + + def test_incremental_read_multi_snapshots(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt']) + self.catalog.create_table('default.test_incremental_100', schema, False) + table = self.catalog.get_table('default.test_incremental_100') + + write_builder = table.new_batch_write_builder() + for i in range(1, 101): + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + pa_table = pa.Table.from_pydict({ + 'user_id': [i], + 'item_id': [1000 + i], + 'behavior': [f'snap{i}'], + 'dt': ['p1' if i % 2 == 1 else 'p2'], + }, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + snapshot_manager = SnapshotManager(table) + t10 = snapshot_manager.get_snapshot_by_id(10).time_millis + t20 = snapshot_manager.get_snapshot_by_id(20).time_millis + + table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t10},{t20}"}) + read_builder = table_inc.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + + expected = pa.Table.from_pydict({ + 'user_id': list(range(11, 21)), + 'item_id': [1000 + i for i in range(11, 21)], + 'behavior': [f'snap{i}' for i in range(11, 21)], + 'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)], + }, schema=self.pa_schema).sort_by('user_id') + self.assertEqual(expected, actual) + def _write_test_table(self, table): write_builder = table.new_batch_write_builder() diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index 6e9dc1ffc6..6bb2cdd675 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -16,29 +16,28 @@ # limitations under the License. ################################################################################ -import os import glob +import os import shutil import tempfile import unittest -from datetime import datetime, date, time +from datetime import date, datetime, time from decimal import Decimal from unittest.mock import Mock import pandas as pd import pyarrow as pa -from pypaimon.table.row.generic_row import GenericRow - -from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, - MapType, PyarrowFieldParser) -from pypaimon.schema.table_schema import TableSchema -from pypaimon import CatalogFactory -from pypaimon import Schema +from pypaimon import CatalogFactory, Schema from pypaimon.manifest.manifest_file_manager import ManifestFileManager -from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, + MapType, PyarrowFieldParser) +from pypaimon.schema.table_schema import TableSchema +from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.row.generic_row import GenericRow from pypaimon.write.file_store_commit import FileStoreCommit @@ -215,10 +214,10 @@ class ReaderBasicTest(unittest.TestCase): self.assertEqual(actual_data, expect_data) # to test GenericRow ability - latest_snapshot = table_scan.snapshot_manager.get_latest_snapshot() - manifest_files = table_scan.manifest_list_manager.read_all(latest_snapshot) - manifest_entries = table_scan.manifest_file_manager.read(manifest_files[0].file_name, - lambda row: table_scan._bucket_filter(row)) + latest_snapshot = SnapshotManager(table).get_latest_snapshot() + manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( + manifest_files[0].file_name, lambda row: table_scan.starting_scanner._bucket_filter(row)) min_value_stats = manifest_entries[0].file.value_stats.min_values.values max_value_stats = manifest_entries[0].file.value_stats.max_values.values expected_min_values = [col[0].as_py() for col in expect_data] diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index b992595fc9..bcbc94bd68 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -19,12 +19,14 @@ import os import shutil import tempfile +import time import unittest import pyarrow as pa -from pypaimon import CatalogFactory -from pypaimon import Schema +from pypaimon import CatalogFactory, Schema +from pypaimon.common.core_options import CoreOptions +from pypaimon.snapshot.snapshot_manager import SnapshotManager class PkReaderTest(unittest.TestCase): @@ -184,6 +186,79 @@ class PkReaderTest(unittest.TestCase): expected = self.expected.select(['dt', 'user_id', 'behavior']) self.assertEqual(actual, expected) + def test_incremental_timestamp(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, + partition_keys=['dt'], + primary_keys=['user_id', 'dt'], + options={'bucket': '2'}) + self.catalog.create_table('default.test_incremental_parquet', schema, False) + table = self.catalog.get_table('default.test_incremental_parquet') + timestamp = int(time.time() * 1000) + self._write_test_table(table) + + snapshot_manager = SnapshotManager(table) + t1 = snapshot_manager.get_snapshot_by_id(1).time_millis + t2 = snapshot_manager.get_snapshot_by_id(2).time_millis + # test 1 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp - 1) + ',' + str(timestamp)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder) + self.assertEqual(len(actual), 0) + # test 2 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(timestamp) + ',' + str(t2)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + self.assertEqual(self.expected, actual) + # test 3 + table = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: str(t1) + ',' + str(t2)}) + read_builder = table.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + expected = pa.Table.from_pydict({ + "user_id": [2, 5, 7, 8], + "item_id": [1002, 1005, 1007, 1008], + "behavior": ["b-new", "e", "g", "h"], + "dt": ["p1", "p2", "p1", "p2"] + }, schema=self.pa_schema) + self.assertEqual(expected, actual) + + def test_incremental_read_multi_snapshots(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, + partition_keys=['dt'], + primary_keys=['user_id', 'dt'], + options={'bucket': '2'}) + self.catalog.create_table('default.test_incremental_read_multi_snapshots', schema, False) + table = self.catalog.get_table('default.test_incremental_read_multi_snapshots') + write_builder = table.new_batch_write_builder() + for i in range(1, 101): + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + pa_table = pa.Table.from_pydict({ + 'user_id': [i], + 'item_id': [1000 + i], + 'behavior': [f'snap{i}'], + 'dt': ['p1' if i % 2 == 1 else 'p2'], + }, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + snapshot_manager = SnapshotManager(table) + t10 = snapshot_manager.get_snapshot_by_id(10).time_millis + t20 = snapshot_manager.get_snapshot_by_id(20).time_millis + + table_inc = table.copy({CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP: f"{t10},{t20}"}) + read_builder = table_inc.new_read_builder() + actual = self._read_test_table(read_builder).sort_by('user_id') + + expected = pa.Table.from_pydict({ + 'user_id': list(range(11, 21)), + 'item_id': [1000 + i for i in range(11, 21)], + 'behavior': [f'snap{i}' for i in range(11, 21)], + 'dt': ['p1' if i % 2 == 1 else 'p2' for i in range(11, 21)], + }, schema=self.pa_schema).sort_by('user_id') + self.assertEqual(expected, actual) + def _write_test_table(self, table): write_builder = table.new_batch_write_builder() diff --git a/paimon-python/pypaimon/tests/rest/rest_base_test.py b/paimon-python/pypaimon/tests/rest/rest_base_test.py index 3a83ccb285..e45fac5cde 100644 --- a/paimon-python/pypaimon/tests/rest/rest_base_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_base_test.py @@ -25,17 +25,16 @@ import uuid import pyarrow as pa +from pypaimon import CatalogFactory, Schema from pypaimon.api.api_response import ConfigResponse from pypaimon.api.auth import BearTokenAuthProvider from pypaimon.api.options import Options from pypaimon.catalog.catalog_context import CatalogContext -from pypaimon import CatalogFactory from pypaimon.catalog.rest.rest_catalog import RESTCatalog from pypaimon.catalog.rest.table_metadata import TableMetadata from pypaimon.common.identifier import Identifier from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, MapType) -from pypaimon import Schema from pypaimon.schema.table_schema import TableSchema from pypaimon.tests.rest.rest_server import RESTCatalogServer diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 10d2796b76..4e5b4d723e 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -27,7 +27,7 @@ from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats -from pypaimon.read.table_scan import TableScan +from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner from pypaimon.snapshot.snapshot import Snapshot from pypaimon.snapshot.snapshot_commit import (PartitionStatistics, SnapshotCommit) @@ -101,7 +101,7 @@ class FileStoreCommit: f"in {msg.partition} does not belong to this partition") commit_entries = [] - current_entries = TableScan(self.table, partition_filter, None, []).plan_files() + current_entries = FullStartingScanner(self.table, partition_filter, None, []).plan_files() for entry in current_entries: entry.kind = 1 commit_entries.append(entry)
