This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new b6c305815c [python] Introduce postpone bucket table (#6074) b6c305815c is described below commit b6c305815cfdb11024c637544e182e2407621ec4 Author: umi <55790489+discivig...@users.noreply.github.com> AuthorDate: Thu Aug 14 11:07:53 2025 +0800 [python] Introduce postpone bucket table (#6074) --- .../pypaimon/manifest/manifest_file_manager.py | 2 +- paimon-python/pypaimon/read/table_scan.py | 25 +++++++---- paimon-python/pypaimon/schema/schema.py | 4 +- paimon-python/pypaimon/table/bucket_mode.py | 3 ++ paimon-python/pypaimon/table/file_store_table.py | 7 +++- paimon-python/pypaimon/tests/rest_table_test.py | 48 ++++++++++++++++++++++ paimon-python/pypaimon/tests/writer_test.py | 2 +- paimon-python/pypaimon/write/row_key_extractor.py | 14 +++++++ paimon-python/pypaimon/write/writer/data_writer.py | 12 +++++- 9 files changed, 101 insertions(+), 16 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 8f7c8e325c..ef674e5b88 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -73,7 +73,7 @@ class ManifestFileManager: total_buckets=record['_TOTAL_BUCKETS'], file=file_meta ) - if shard_filter and not shard_filter(entry): + if not shard_filter(entry): continue entries.append(entry) return entries diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index fa4ade3a5f..2745b1d1b3 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -29,6 +29,7 @@ from pypaimon.read.plan import Plan from pypaimon.read.split import Split from pypaimon.schema.data_types import DataField from pypaimon.snapshot.snapshot_manager import SnapshotManager +from pypaimon.table.bucket_mode import BucketMode from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor @@ -55,6 +56,9 @@ class TableScan: self.idx_of_this_subtask = None self.number_of_para_subtasks = None + self.only_read_real_buckets = True if self.table.options.get('bucket', + -1) == BucketMode.POSTPONE_BUCKET.value else False + def plan(self) -> Plan: latest_snapshot = self.snapshot_manager.get_latest_snapshot() if not latest_snapshot: @@ -64,8 +68,7 @@ class TableScan: file_entries = [] for manifest_file_path in manifest_files: manifest_entries = self.manifest_file_manager.read(manifest_file_path, - (lambda row: self._shard_filter(row)) - if self.idx_of_this_subtask is not None else None) + lambda row: self._bucket_filter(row)) for entry in manifest_entries: if entry.kind == 0: file_entries.append(entry) @@ -93,13 +96,17 @@ class TableScan: self.number_of_para_subtasks = number_of_para_subtasks return self - def _shard_filter(self, entry: Optional[ManifestEntry]) -> bool: - if self.table.is_primary_key_table: - bucket = entry.bucket - return bucket % self.number_of_para_subtasks == self.idx_of_this_subtask - else: - file = entry.file.file_name - return FixedBucketRowKeyExtractor.hash(file) % self.number_of_para_subtasks == self.idx_of_this_subtask + def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool: + bucket = entry.bucket + if self.only_read_real_buckets and bucket < 0: + return False + if self.idx_of_this_subtask is not None: + if self.table.is_primary_key_table: + return bucket % self.number_of_para_subtasks == self.idx_of_this_subtask + else: + file = entry.file.file_name + return FixedBucketRowKeyExtractor.hash(file) % self.number_of_para_subtasks == self.idx_of_this_subtask + return True def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]: if self.limit is None: diff --git a/paimon-python/pypaimon/schema/schema.py b/paimon-python/pypaimon/schema/schema.py index 6c3363a586..20e0720087 100644 --- a/paimon-python/pypaimon/schema/schema.py +++ b/paimon-python/pypaimon/schema/schema.py @@ -40,7 +40,7 @@ class Schema: def __init__(self, fields: Optional[List[DataField]] = None, partition_keys: Optional[List[str]] = None, primary_keys: Optional[List[str]] = None, - options: Optional[Dict[str, str]] = None, comment: Optional[str] = None): + options: Optional[Dict] = None, comment: Optional[str] = None): self.fields = fields if fields is not None else [] self.partition_keys = partition_keys if partition_keys is not None else [] self.primary_keys = primary_keys if primary_keys is not None else [] @@ -49,6 +49,6 @@ class Schema: @staticmethod def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: Optional[List[str]] = None, - primary_keys: Optional[List[str]] = None, options: Optional[Dict[str, str]] = None, + primary_keys: Optional[List[str]] = None, options: Optional[Dict] = None, comment: Optional[str] = None): return Schema(PyarrowFieldParser.to_paimon_schema(pa_schema), partition_keys, primary_keys, options, comment) diff --git a/paimon-python/pypaimon/table/bucket_mode.py b/paimon-python/pypaimon/table/bucket_mode.py index df6c8e949d..22e79bf3c3 100644 --- a/paimon-python/pypaimon/table/bucket_mode.py +++ b/paimon-python/pypaimon/table/bucket_mode.py @@ -27,3 +27,6 @@ class BucketMode(Enum): HASH_DYNAMIC = auto() CROSS_PARTITION = auto() BUCKET_UNAWARE = auto() + POSTPONE_MODE = auto() + + POSTPONE_BUCKET = -2 diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 1ceb007fae..4fcef0473d 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -29,6 +29,7 @@ from pypaimon.table.table import Table from pypaimon.write.batch_write_builder import BatchWriteBuilder from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor, FixedBucketRowKeyExtractor, + PostponeBucketRowKeyExtractor, RowKeyExtractor, UnawareBucketRowKeyExtractor) @@ -52,7 +53,9 @@ class FileStoreTable(Table): def bucket_mode(self) -> BucketMode: if self.is_primary_key_table: - if self.options.get(CoreOptions.BUCKET, -1) == -1: + if self.options.get(CoreOptions.BUCKET, -1) == -2: + return BucketMode.POSTPONE_MODE + elif self.options.get(CoreOptions.BUCKET, -1) == -1: if self.cross_partition_update: return BucketMode.CROSS_PARTITION else: @@ -77,6 +80,8 @@ class FileStoreTable(Table): return FixedBucketRowKeyExtractor(self.table_schema) elif bucket_mode == BucketMode.BUCKET_UNAWARE: return UnawareBucketRowKeyExtractor(self.table_schema) + elif bucket_mode == BucketMode.POSTPONE_MODE: + return PostponeBucketRowKeyExtractor(self.table_schema) elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode == BucketMode.CROSS_PARTITION: return DynamicBucketRowKeyExtractor(self.table_schema) else: diff --git a/paimon-python/pypaimon/tests/rest_table_test.py b/paimon-python/pypaimon/tests/rest_table_test.py index a1feed8d50..9c64c17003 100644 --- a/paimon-python/pypaimon/tests/rest_table_test.py +++ b/paimon-python/pypaimon/tests/rest_table_test.py @@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import glob +import os import pyarrow as pa @@ -145,3 +147,49 @@ class RESTTableTest(RESTCatalogBaseTest): } expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema) self.assertEqual(actual, expected) + + def test_postpone_write(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['user_id'], primary_keys=['user_id', 'dt'], + options={'bucket': -2}) + self.rest_catalog.create_table('default.test_postpone', schema, False) + table = self.rest_catalog.get_table('default.test_postpone') + + expect = pa.Table.from_pydict(self.data, schema=self.pa_schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(expect) + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + table_write.close() + table_commit.close() + + self.assertTrue(os.path.exists(self.warehouse + "/default/test_postpone/snapshot/LATEST")) + self.assertTrue(os.path.exists(self.warehouse + "/default/test_postpone/snapshot/snapshot-1")) + self.assertTrue(os.path.exists(self.warehouse + "/default/test_postpone/manifest")) + self.assertEqual(len(glob.glob(self.warehouse + "/default/test_postpone/manifest/*.avro")), 2) + self.assertEqual(len(glob.glob(self.warehouse + "/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1) + + def test_postpone_read_write(self): + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['user_id'], primary_keys=['user_id', 'dt'], + options={'bucket': -2}) + self.rest_catalog.create_table('default.test_postpone', schema, False) + table = self.rest_catalog.get_table('default.test_postpone') + + expect = pa.Table.from_pydict(self.data, schema=self.pa_schema) + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + table_write.write_arrow(expect) + commit_messages = table_write.prepare_commit() + table_commit.commit(commit_messages) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + actual = table_read.to_arrow(splits) + self.assertTrue(not actual) diff --git a/paimon-python/pypaimon/tests/writer_test.py b/paimon-python/pypaimon/tests/writer_test.py index 1d1cec7382..f1f6bb526b 100644 --- a/paimon-python/pypaimon/tests/writer_test.py +++ b/paimon-python/pypaimon/tests/writer_test.py @@ -27,7 +27,7 @@ from pypaimon.catalog.catalog_factory import CatalogFactory from pypaimon.schema.schema import Schema -class WriterTestCase(unittest.TestCase): +class WriterTest(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index 801cc5fff4..bec8e08fb7 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -24,6 +24,7 @@ import pyarrow as pa from pypaimon.common.core_options import CoreOptions from pypaimon.schema.table_schema import TableSchema +from pypaimon.table.bucket_mode import BucketMode class RowKeyExtractor(ABC): @@ -125,3 +126,16 @@ class DynamicBucketRowKeyExtractor(RowKeyExtractor): def _extract_buckets_batch(self, data: pa.RecordBatch) -> int: raise ValueError("Can't extract bucket from row in dynamic bucket mode") + + +class PostponeBucketRowKeyExtractor(RowKeyExtractor): + """Extractor for unaware bucket mode (bucket = -1, no primary keys).""" + + def __init__(self, table_schema: TableSchema): + super().__init__(table_schema) + num_buckets = table_schema.options.get(CoreOptions.BUCKET, -2) + if num_buckets != BucketMode.POSTPONE_BUCKET.value: + raise ValueError(f"Postpone bucket mode requires bucket = -2, got {num_buckets}") + + def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]: + return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index c84cd5d39a..c11d991b84 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -25,6 +25,7 @@ import pyarrow as pa from pypaimon.common.core_options import CoreOptions from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.row.binary_row import BinaryRow @@ -44,7 +45,10 @@ class DataWriter(ABC): options = self.table.options self.target_file_size = 256 * 1024 * 1024 - self.file_format = options.get(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET) + self.file_format = options.get(CoreOptions.FILE_FORMAT, + CoreOptions.FILE_FORMAT_PARQUET + if self.bucket != BucketMode.POSTPONE_BUCKET.value + else CoreOptions.FILE_FORMAT_AVRO) self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd") self.pending_data: Optional[pa.RecordBatch] = None @@ -134,7 +138,11 @@ class DataWriter(ABC): for i, field_name in enumerate(self.table.partition_keys): path_builder = path_builder / (field_name + "=" + str(self.partition[i])) - path_builder = path_builder / ("bucket-" + str(self.bucket)) / file_name + if self.bucket == BucketMode.POSTPONE_BUCKET.value: + bucket_name = "postpone" + else: + bucket_name = str(self.bucket) + path_builder = path_builder / ("bucket-" + bucket_name) / file_name return path_builder