This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1799249614 [python] bucket pruning: type-check precision and only 
default bucket function
1799249614 is described below

commit 1799249614d64d246a53619618be81164c7b7f7c
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 9 21:11:10 2026 +0800

    [python] bucket pruning: type-check precision and only default bucket 
function
---
 .../pypaimon/manifest/manifest_file_manager.py     | 12 ++++++++---
 .../read/scanner/bucket_select_converter.py        |  9 ++++----
 .../pypaimon/read/scanner/file_scanner.py          | 24 ++++++++++++++--------
 paimon-python/pypaimon/schema/table_schema.py      |  3 +--
 4 files changed, 30 insertions(+), 18 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 308dc13a73..69e46943d3 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -106,9 +106,15 @@ class ManifestFileManager:
         reader = fastavro.reader(buffer)
 
         for record in reader:
-            if early_entry_filter is not None and not early_entry_filter(
-                    record['_BUCKET'], record['_TOTAL_BUCKETS']):
-                continue
+            if early_entry_filter is not None:
+                try:
+                    bucket = record['_BUCKET']
+                    total_buckets = record['_TOTAL_BUCKETS']
+                except KeyError:
+                    pass
+                else:
+                    if not early_entry_filter(bucket, total_buckets):
+                        continue
             file_dict = dict(record['_FILE'])
             key_dict = dict(file_dict['_KEY_STATS'])
             key_stats = SimpleStats(
diff --git a/paimon-python/pypaimon/read/scanner/bucket_select_converter.py 
b/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
index e0c3b6bfa4..da3f0e0f47 100644
--- a/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
+++ b/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
@@ -102,16 +102,17 @@ MAX_VALUES = 1000
 #      hasn't been cross-validated against Java's ``BinaryRow`` (ARRAY,
 #      MAP, ROW, MULTISET, VARIANT, BLOB). Until that validation lands,
 #      treating them as safe risks a hash divergence.
-_UNSAFE_BUCKET_KEY_TYPES = (
+_UNSAFE_BUCKET_KEY_TYPES = frozenset({
     'DECIMAL',
     'TIMESTAMP',
+    'TIMESTAMP_WITH_LOCAL_TIME_ZONE',
     'ARRAY',
     'MAP',
     'ROW',
     'MULTISET',
     'VARIANT',
     'BLOB',
-)
+})
 
 
 def _has_unsafe_bucket_key_type(bucket_key_fields: List[DataField]) -> bool:
@@ -119,8 +120,8 @@ def _has_unsafe_bucket_key_type(bucket_key_fields: 
List[DataField]) -> bool:
         type_name = getattr(getattr(f, 'type', None), 'type', '')
         if not type_name:
             continue
-        head = type_name.split('(')[0].strip().upper()
-        if any(head.startswith(prefix) for prefix in _UNSAFE_BUCKET_KEY_TYPES):
+        head = type_name.split('(')[0].split('<')[0].strip().upper()
+        if head in _UNSAFE_BUCKET_KEY_TYPES:
             return True
     return False
 
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 795e76a160..70cfa6c978 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -463,6 +463,12 @@ class FileScanner:
             # Defensive: any catalog/proxy table that fails the mode check
             # falls back to no pruning rather than crashing the scan.
             return None
+        # Only the default hash function (Math.abs(hash % numBuckets)) is
+        # supported for bucket pruning. Non-default functions (mod, hive)
+        # use different algorithms and would produce wrong bucket sets.
+        bucket_func = 
self.table.table_schema.options.get('bucket-function.type', 'default')
+        if bucket_func.lower() != 'default':
+            return None
         try:
             bucket_key_fields = 
self.table.table_schema.logical_bucket_key_fields
         except Exception:
@@ -476,15 +482,15 @@ class FileScanner:
         return create_bucket_selector(self.predicate, bucket_key_fields)
 
     def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
-        # NOTE: bucket-level filtering (``only_read_real_buckets`` + the
-        # predicate-driven selector) is enforced in the manifest reader's
-        # early filter (see ``_build_early_bucket_filter``) so rejected
-        # entries skip ``_FILE`` / partition decoding entirely. This
-        # method assumes that early filter has already run; a caller that
-        # bypasses ``read_entries_parallel`` and invokes this directly on
-        # raw entries MUST still apply ``_build_early_bucket_filter`` (or
-        # otherwise enforce ``bucket >= 0`` on POSTPONE tables) — this
-        # function alone is not sound on its own.
+        # Redundant safety net: the early filter in the manifest reader
+        # already enforces these, but guard here too so this method is
+        # self-contained if called outside read_entries_parallel.
+        if self.only_read_real_buckets and entry.bucket < 0:
+            return False
+        if (self._bucket_selector is not None
+                and entry.bucket >= 0
+                and not self._bucket_selector(entry.bucket, 
entry.total_buckets)):
+            return False
         if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
             return False
         # Get SimpleStatsEvolution for this schema
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index 789cf4e34c..8c6e428f4a 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -99,8 +99,7 @@ class TableSchema:
         declared. Mirrors Java ``TableSchema.logicalBucketKeyType()``.
         """
         field_map = {f.name: f for f in self.fields}
-        return [field_map[name] for name in self.bucket_keys
-                if name in field_map]
+        return [field_map[name] for name in self.bucket_keys]
 
     def to_schema(self) -> Schema:
         return Schema(

Reply via email to