This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1799249614 [python] bucket pruning: type-check precision and only
default bucket function
1799249614 is described below
commit 1799249614d64d246a53619618be81164c7b7f7c
Author: JingsongLi <[email protected]>
AuthorDate: Sat May 9 21:11:10 2026 +0800
[python] bucket pruning: type-check precision and only default bucket
function
---
.../pypaimon/manifest/manifest_file_manager.py | 12 ++++++++---
.../read/scanner/bucket_select_converter.py | 9 ++++----
.../pypaimon/read/scanner/file_scanner.py | 24 ++++++++++++++--------
paimon-python/pypaimon/schema/table_schema.py | 3 +--
4 files changed, 30 insertions(+), 18 deletions(-)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 308dc13a73..69e46943d3 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -106,9 +106,15 @@ class ManifestFileManager:
reader = fastavro.reader(buffer)
for record in reader:
- if early_entry_filter is not None and not early_entry_filter(
- record['_BUCKET'], record['_TOTAL_BUCKETS']):
- continue
+ if early_entry_filter is not None:
+ try:
+ bucket = record['_BUCKET']
+ total_buckets = record['_TOTAL_BUCKETS']
+ except KeyError:
+ pass
+ else:
+ if not early_entry_filter(bucket, total_buckets):
+ continue
file_dict = dict(record['_FILE'])
key_dict = dict(file_dict['_KEY_STATS'])
key_stats = SimpleStats(
diff --git a/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
b/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
index e0c3b6bfa4..da3f0e0f47 100644
--- a/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
+++ b/paimon-python/pypaimon/read/scanner/bucket_select_converter.py
@@ -102,16 +102,17 @@ MAX_VALUES = 1000
# hasn't been cross-validated against Java's ``BinaryRow`` (ARRAY,
# MAP, ROW, MULTISET, VARIANT, BLOB). Until that validation lands,
# treating them as safe risks a hash divergence.
-_UNSAFE_BUCKET_KEY_TYPES = (
+_UNSAFE_BUCKET_KEY_TYPES = frozenset({
'DECIMAL',
'TIMESTAMP',
+ 'TIMESTAMP_WITH_LOCAL_TIME_ZONE',
'ARRAY',
'MAP',
'ROW',
'MULTISET',
'VARIANT',
'BLOB',
-)
+})
def _has_unsafe_bucket_key_type(bucket_key_fields: List[DataField]) -> bool:
@@ -119,8 +120,8 @@ def _has_unsafe_bucket_key_type(bucket_key_fields:
List[DataField]) -> bool:
type_name = getattr(getattr(f, 'type', None), 'type', '')
if not type_name:
continue
- head = type_name.split('(')[0].strip().upper()
- if any(head.startswith(prefix) for prefix in _UNSAFE_BUCKET_KEY_TYPES):
+ head = type_name.split('(')[0].split('<')[0].strip().upper()
+ if head in _UNSAFE_BUCKET_KEY_TYPES:
return True
return False
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 795e76a160..70cfa6c978 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -463,6 +463,12 @@ class FileScanner:
# Defensive: any catalog/proxy table that fails the mode check
# falls back to no pruning rather than crashing the scan.
return None
+ # Only the default hash function (Math.abs(hash % numBuckets)) is
+ # supported for bucket pruning. Non-default functions (mod, hive)
+ # use different algorithms and would produce wrong bucket sets.
+ bucket_func =
self.table.table_schema.options.get('bucket-function.type', 'default')
+ if bucket_func.lower() != 'default':
+ return None
try:
bucket_key_fields =
self.table.table_schema.logical_bucket_key_fields
except Exception:
@@ -476,15 +482,15 @@ class FileScanner:
return create_bucket_selector(self.predicate, bucket_key_fields)
def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
- # NOTE: bucket-level filtering (``only_read_real_buckets`` + the
- # predicate-driven selector) is enforced in the manifest reader's
- # early filter (see ``_build_early_bucket_filter``) so rejected
- # entries skip ``_FILE`` / partition decoding entirely. This
- # method assumes that early filter has already run; a caller that
- # bypasses ``read_entries_parallel`` and invokes this directly on
- # raw entries MUST still apply ``_build_early_bucket_filter`` (or
- # otherwise enforce ``bucket >= 0`` on POSTPONE tables) — this
- # function alone is not sound on its own.
+ # Redundant safety net: the early filter in the manifest reader
+ # already enforces these, but guard here too so this method is
+ # self-contained if called outside read_entries_parallel.
+ if self.only_read_real_buckets and entry.bucket < 0:
+ return False
+ if (self._bucket_selector is not None
+ and entry.bucket >= 0
+ and not self._bucket_selector(entry.bucket,
entry.total_buckets)):
+ return False
if self.partition_key_predicate and not
self.partition_key_predicate.test(entry.partition):
return False
# Get SimpleStatsEvolution for this schema
diff --git a/paimon-python/pypaimon/schema/table_schema.py
b/paimon-python/pypaimon/schema/table_schema.py
index 789cf4e34c..8c6e428f4a 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -99,8 +99,7 @@ class TableSchema:
declared. Mirrors Java ``TableSchema.logicalBucketKeyType()``.
"""
field_map = {f.name: f for f in self.fields}
- return [field_map[name] for name in self.bucket_keys
- if name in field_map]
+ return [field_map[name] for name in self.bucket_keys]
def to_schema(self) -> Schema:
return Schema(