This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b6c305815c [python] Introduce postpone bucket table (#6074)
b6c305815c is described below

commit b6c305815cfdb11024c637544e182e2407621ec4
Author: umi <55790489+discivig...@users.noreply.github.com>
AuthorDate: Thu Aug 14 11:07:53 2025 +0800

    [python] Introduce postpone bucket table (#6074)
---
 .../pypaimon/manifest/manifest_file_manager.py     |  2 +-
 paimon-python/pypaimon/read/table_scan.py          | 25 +++++++----
 paimon-python/pypaimon/schema/schema.py            |  4 +-
 paimon-python/pypaimon/table/bucket_mode.py        |  3 ++
 paimon-python/pypaimon/table/file_store_table.py   |  7 +++-
 paimon-python/pypaimon/tests/rest_table_test.py    | 48 ++++++++++++++++++++++
 paimon-python/pypaimon/tests/writer_test.py        |  2 +-
 paimon-python/pypaimon/write/row_key_extractor.py  | 14 +++++++
 paimon-python/pypaimon/write/writer/data_writer.py | 12 +++++-
 9 files changed, 101 insertions(+), 16 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 8f7c8e325c..ef674e5b88 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -73,7 +73,7 @@ class ManifestFileManager:
                 total_buckets=record['_TOTAL_BUCKETS'],
                 file=file_meta
             )
-            if shard_filter and not shard_filter(entry):
+            if not shard_filter(entry):
                 continue
             entries.append(entry)
         return entries
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index fa4ade3a5f..2745b1d1b3 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -29,6 +29,7 @@ from pypaimon.read.plan import Plan
 from pypaimon.read.split import Split
 from pypaimon.schema.data_types import DataField
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor
 
 
@@ -55,6 +56,9 @@ class TableScan:
         self.idx_of_this_subtask = None
         self.number_of_para_subtasks = None
 
+        self.only_read_real_buckets = True if self.table.options.get('bucket',
+                                                                     -1) == 
BucketMode.POSTPONE_BUCKET.value else False
+
     def plan(self) -> Plan:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if not latest_snapshot:
@@ -64,8 +68,7 @@ class TableScan:
         file_entries = []
         for manifest_file_path in manifest_files:
             manifest_entries = 
self.manifest_file_manager.read(manifest_file_path,
-                                                               (lambda row: 
self._shard_filter(row))
-                                                               if 
self.idx_of_this_subtask is not None else None)
+                                                               lambda row: 
self._bucket_filter(row))
             for entry in manifest_entries:
                 if entry.kind == 0:
                     file_entries.append(entry)
@@ -93,13 +96,17 @@ class TableScan:
         self.number_of_para_subtasks = number_of_para_subtasks
         return self
 
-    def _shard_filter(self, entry: Optional[ManifestEntry]) -> bool:
-        if self.table.is_primary_key_table:
-            bucket = entry.bucket
-            return bucket % self.number_of_para_subtasks == 
self.idx_of_this_subtask
-        else:
-            file = entry.file.file_name
-            return FixedBucketRowKeyExtractor.hash(file) % 
self.number_of_para_subtasks == self.idx_of_this_subtask
+    def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool:
+        bucket = entry.bucket
+        if self.only_read_real_buckets and bucket < 0:
+            return False
+        if self.idx_of_this_subtask is not None:
+            if self.table.is_primary_key_table:
+                return bucket % self.number_of_para_subtasks == 
self.idx_of_this_subtask
+            else:
+                file = entry.file.file_name
+                return FixedBucketRowKeyExtractor.hash(file) % 
self.number_of_para_subtasks == self.idx_of_this_subtask
+        return True
 
     def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
         if self.limit is None:
diff --git a/paimon-python/pypaimon/schema/schema.py 
b/paimon-python/pypaimon/schema/schema.py
index 6c3363a586..20e0720087 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -40,7 +40,7 @@ class Schema:
 
     def __init__(self, fields: Optional[List[DataField]] = None, 
partition_keys: Optional[List[str]] = None,
                  primary_keys: Optional[List[str]] = None,
-                 options: Optional[Dict[str, str]] = None, comment: 
Optional[str] = None):
+                 options: Optional[Dict] = None, comment: Optional[str] = 
None):
         self.fields = fields if fields is not None else []
         self.partition_keys = partition_keys if partition_keys is not None 
else []
         self.primary_keys = primary_keys if primary_keys is not None else []
@@ -49,6 +49,6 @@ class Schema:
 
     @staticmethod
     def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: 
Optional[List[str]] = None,
-                            primary_keys: Optional[List[str]] = None, options: 
Optional[Dict[str, str]] = None,
+                            primary_keys: Optional[List[str]] = None, options: 
Optional[Dict] = None,
                             comment: Optional[str] = None):
         return Schema(PyarrowFieldParser.to_paimon_schema(pa_schema), 
partition_keys, primary_keys, options, comment)
diff --git a/paimon-python/pypaimon/table/bucket_mode.py 
b/paimon-python/pypaimon/table/bucket_mode.py
index df6c8e949d..22e79bf3c3 100644
--- a/paimon-python/pypaimon/table/bucket_mode.py
+++ b/paimon-python/pypaimon/table/bucket_mode.py
@@ -27,3 +27,6 @@ class BucketMode(Enum):
     HASH_DYNAMIC = auto()
     CROSS_PARTITION = auto()
     BUCKET_UNAWARE = auto()
+    POSTPONE_MODE = auto()
+
+    POSTPONE_BUCKET = -2
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 1ceb007fae..4fcef0473d 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -29,6 +29,7 @@ from pypaimon.table.table import Table
 from pypaimon.write.batch_write_builder import BatchWriteBuilder
 from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
                                               FixedBucketRowKeyExtractor,
+                                              PostponeBucketRowKeyExtractor,
                                               RowKeyExtractor,
                                               UnawareBucketRowKeyExtractor)
 
@@ -52,7 +53,9 @@ class FileStoreTable(Table):
 
     def bucket_mode(self) -> BucketMode:
         if self.is_primary_key_table:
-            if self.options.get(CoreOptions.BUCKET, -1) == -1:
+            if self.options.get(CoreOptions.BUCKET, -1) == -2:
+                return BucketMode.POSTPONE_MODE
+            elif self.options.get(CoreOptions.BUCKET, -1) == -1:
                 if self.cross_partition_update:
                     return BucketMode.CROSS_PARTITION
                 else:
@@ -77,6 +80,8 @@ class FileStoreTable(Table):
             return FixedBucketRowKeyExtractor(self.table_schema)
         elif bucket_mode == BucketMode.BUCKET_UNAWARE:
             return UnawareBucketRowKeyExtractor(self.table_schema)
+        elif bucket_mode == BucketMode.POSTPONE_MODE:
+            return PostponeBucketRowKeyExtractor(self.table_schema)
         elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode == 
BucketMode.CROSS_PARTITION:
             return DynamicBucketRowKeyExtractor(self.table_schema)
         else:
diff --git a/paimon-python/pypaimon/tests/rest_table_test.py 
b/paimon-python/pypaimon/tests/rest_table_test.py
index a1feed8d50..9c64c17003 100644
--- a/paimon-python/pypaimon/tests/rest_table_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_test.py
@@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import glob
+import os
 
 import pyarrow as pa
 
@@ -145,3 +147,49 @@ class RESTTableTest(RESTCatalogBaseTest):
         }
         expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema)
         self.assertEqual(actual, expected)
+
+    def test_postpone_write(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': -2})
+        self.rest_catalog.create_table('default.test_postpone', schema, False)
+        table = self.rest_catalog.get_table('default.test_postpone')
+
+        expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(expect)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/LATEST"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/snapshot-1"))
+        self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/manifest"))
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
+
+    def test_postpone_read_write(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+                                            options={'bucket': -2})
+        self.rest_catalog.create_table('default.test_postpone', schema, False)
+        table = self.rest_catalog.get_table('default.test_postpone')
+
+        expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        table_write.write_arrow(expect)
+        commit_messages = table_write.prepare_commit()
+        table_commit.commit(commit_messages)
+        table_write.close()
+        table_commit.close()
+
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits)
+        self.assertTrue(not actual)
diff --git a/paimon-python/pypaimon/tests/writer_test.py 
b/paimon-python/pypaimon/tests/writer_test.py
index 1d1cec7382..f1f6bb526b 100644
--- a/paimon-python/pypaimon/tests/writer_test.py
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -27,7 +27,7 @@ from pypaimon.catalog.catalog_factory import CatalogFactory
 from pypaimon.schema.schema import Schema
 
 
-class WriterTestCase(unittest.TestCase):
+class WriterTest(unittest.TestCase):
 
     @classmethod
     def setUpClass(cls):
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py 
b/paimon-python/pypaimon/write/row_key_extractor.py
index 801cc5fff4..bec8e08fb7 100644
--- a/paimon-python/pypaimon/write/row_key_extractor.py
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -24,6 +24,7 @@ import pyarrow as pa
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.bucket_mode import BucketMode
 
 
 class RowKeyExtractor(ABC):
@@ -125,3 +126,16 @@ class DynamicBucketRowKeyExtractor(RowKeyExtractor):
 
     def _extract_buckets_batch(self, data: pa.RecordBatch) -> int:
         raise ValueError("Can't extract bucket from row in dynamic bucket 
mode")
+
+
+class PostponeBucketRowKeyExtractor(RowKeyExtractor):
+    """Extractor for unaware bucket mode (bucket = -1, no primary keys)."""
+
+    def __init__(self, table_schema: TableSchema):
+        super().__init__(table_schema)
+        num_buckets = table_schema.options.get(CoreOptions.BUCKET, -2)
+        if num_buckets != BucketMode.POSTPONE_BUCKET.value:
+            raise ValueError(f"Postpone bucket mode requires bucket = -2, got 
{num_buckets}")
+
+    def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
+        return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index c84cd5d39a..c11d991b84 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -25,6 +25,7 @@ import pyarrow as pa
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.row.binary_row import BinaryRow
 
 
@@ -44,7 +45,10 @@ class DataWriter(ABC):
 
         options = self.table.options
         self.target_file_size = 256 * 1024 * 1024
-        self.file_format = options.get(CoreOptions.FILE_FORMAT, 
CoreOptions.FILE_FORMAT_PARQUET)
+        self.file_format = options.get(CoreOptions.FILE_FORMAT,
+                                       CoreOptions.FILE_FORMAT_PARQUET
+                                       if self.bucket != 
BucketMode.POSTPONE_BUCKET.value
+                                       else CoreOptions.FILE_FORMAT_AVRO)
         self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
 
         self.pending_data: Optional[pa.RecordBatch] = None
@@ -134,7 +138,11 @@ class DataWriter(ABC):
 
         for i, field_name in enumerate(self.table.partition_keys):
             path_builder = path_builder / (field_name + "=" + 
str(self.partition[i]))
-        path_builder = path_builder / ("bucket-" + str(self.bucket)) / 
file_name
+        if self.bucket == BucketMode.POSTPONE_BUCKET.value:
+            bucket_name = "postpone"
+        else:
+            bucket_name = str(self.bucket)
+        path_builder = path_builder / ("bucket-" + bucket_name) / file_name
 
         return path_builder
 

Reply via email to