This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5b667d893c [python] Add $buckets system table (#7989)
5b667d893c is described below
commit 5b667d893c7c2b800cbf3f301b5a86745038cd2f
Author: Junrui Lee <[email protected]>
AuthorDate: Thu May 28 16:54:36 2026 +0800
[python] Add $buckets system table (#7989)
---
.../pypaimon/table/system/buckets_table.py | 165 +++++++++++++++++++++
.../pypaimon/table/system/system_table_loader.py | 4 +-
.../pypaimon/tests/system/buckets_table_test.py | 136 +++++++++++++++++
.../tests/system/system_table_loader_test.py | 2 +-
4 files changed, 305 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/table/system/buckets_table.py
b/paimon-python/pypaimon/table/system/buckets_table.py
new file mode 100644
index 0000000000..ddd5e8c6c6
--- /dev/null
+++ b/paimon-python/pypaimon/table/system/buckets_table.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""The ``$buckets`` system table — per-bucket aggregated stats."""
+
+from typing import List, Optional
+
+import pyarrow
+
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.schema.data_types import AtomicType, DataField, RowType
+from pypaimon.table.system.system_table import SystemTable
+
+
+TABLE_TYPE = RowType(False, [
+ DataField(0, "partition", AtomicType("STRING", nullable=True)),
+ DataField(1, "bucket", AtomicType("INT", nullable=False)),
+ DataField(2, "record_count", AtomicType("BIGINT", nullable=False)),
+ DataField(3, "file_size_in_bytes", AtomicType("BIGINT", nullable=False)),
+ DataField(4, "file_count", AtomicType("BIGINT", nullable=False)),
+ DataField(5, "last_update_time", AtomicType("TIMESTAMP(3)",
nullable=True)),
+])
+
+
+_TIMESTAMP_TYPE = pyarrow.timestamp("ms")
+
+
+class BucketsTable(SystemTable):
+ """The ``$buckets`` system table.
+
+ Aggregates manifest entries by (partition, bucket) to show per-bucket
+ record counts, file sizes, file counts and last update times.
+ """
+
+ def system_table_name(self) -> str:
+ return "buckets"
+
+ def row_type(self) -> RowType:
+ return TABLE_TYPE
+
+ def primary_keys(self) -> List[str]:
+ return ["partition", "bucket"]
+
+ def _build_arrow_table(self) -> pyarrow.Table:
+ snapshot = self.base_table.snapshot_manager().get_latest_snapshot()
+ if snapshot is None:
+ return self._empty_table()
+
+ manifest_list_manager = ManifestListManager(self.base_table)
+ manifest_files = manifest_list_manager.read_all(snapshot)
+ manifest_file_manager = ManifestFileManager(self.base_table)
+ entries = manifest_file_manager.read_entries_parallel(
+ manifest_files, drop_stats=True)
+
+ _NULL = object()
+
+ bucket_map: dict = {}
+ for entry in entries:
+ raw_key = tuple(
+ (field.name, _NULL if value is None else value)
+ for field, value in zip(
+ entry.partition.fields, entry.partition.values))
+ bucket_id = int(entry.bucket)
+ key = (raw_key, bucket_id)
+
+ stats = bucket_map.get(key)
+ if stats is None:
+ render_items = tuple(
+ (name, str(val) if val is not _NULL else None)
+ for name, val in raw_key)
+ stats = {
+ "render_items": render_items,
+ "bucket": bucket_id,
+ "record_count": 0,
+ "file_size_in_bytes": 0,
+ "file_count": 0,
+ "last_update_time": None,
+ }
+ bucket_map[key] = stats
+
+ stats["record_count"] += int(entry.file.row_count)
+ stats["file_size_in_bytes"] += int(entry.file.file_size)
+ stats["file_count"] += 1
+ ct_ms = entry.file.creation_time_epoch_millis()
+ if ct_ms is not None:
+ if (stats["last_update_time"] is None
+ or ct_ms > stats["last_update_time"]):
+ stats["last_update_time"] = ct_ms
+
+ sorted_keys = sorted(
+ bucket_map.keys(),
+ key=lambda k: (
+ _render_partition(bucket_map[k]["render_items"]) or "",
+ k[1]))
+
+ partition_strings: List[Optional[str]] = []
+ buckets: List[int] = []
+ record_counts: List[int] = []
+ file_sizes: List[int] = []
+ file_counts: List[int] = []
+ last_update_times: List[Optional[int]] = []
+
+ for key in sorted_keys:
+ stats = bucket_map[key]
+ partition_strings.append(_render_partition(stats["render_items"]))
+ buckets.append(stats["bucket"])
+ record_counts.append(stats["record_count"])
+ file_sizes.append(stats["file_size_in_bytes"])
+ file_counts.append(stats["file_count"])
+ last_update_times.append(stats["last_update_time"])
+
+ return pyarrow.table({
+ "partition": pyarrow.array(
+ partition_strings, type=pyarrow.string()),
+ "bucket": pyarrow.array(buckets, type=pyarrow.int32()),
+ "record_count": pyarrow.array(
+ record_counts, type=pyarrow.int64()),
+ "file_size_in_bytes": pyarrow.array(
+ file_sizes, type=pyarrow.int64()),
+ "file_count": pyarrow.array(file_counts, type=pyarrow.int64()),
+ "last_update_time": pyarrow.array(
+ last_update_times, type=_TIMESTAMP_TYPE),
+ })
+
+ @staticmethod
+ def _empty_table() -> pyarrow.Table:
+ return pyarrow.table({
+ "partition": pyarrow.array([], type=pyarrow.string()),
+ "bucket": pyarrow.array([], type=pyarrow.int32()),
+ "record_count": pyarrow.array([], type=pyarrow.int64()),
+ "file_size_in_bytes": pyarrow.array([], type=pyarrow.int64()),
+ "file_count": pyarrow.array([], type=pyarrow.int64()),
+ "last_update_time": pyarrow.array([], type=_TIMESTAMP_TYPE),
+ })
+
+
+def _render_partition(spec_items) -> Optional[str]:
+ """Render a partition spec as ``pt=v/pt2=v2`` or None when empty.
+
+ Null partition values are rendered as ``__NULL__`` to distinguish them
+ from the literal string ``"None"``. A partition whose value is
+ literally ``"__NULL__"`` will produce the same rendered string —
+ aggregation keys are still distinct, but the displayed partition
+ column will collide. This is a display-only limitation.
+ """
+ if not spec_items:
+ return None
+ return "/".join(
+ "{}={}".format(name, "__NULL__" if value is None else value)
+ for name, value in spec_items)
diff --git a/paimon-python/pypaimon/table/system/system_table_loader.py
b/paimon-python/pypaimon/table/system/system_table_loader.py
index 9d0576aec1..72b758947d 100644
--- a/paimon-python/pypaimon/table/system/system_table_loader.py
+++ b/paimon-python/pypaimon/table/system/system_table_loader.py
@@ -24,7 +24,7 @@ new module.
The following short names are intentionally not registered here yet:
audit_log, binlog, read_optimized, consumers, statistics,
- aggregation_fields, buckets, file_key_ranges, table_indexes,
+ aggregation_fields, file_key_ranges, table_indexes,
row_tracking, all_tables, all_partitions, all_table_options,
catalog_options
"""
@@ -44,6 +44,7 @@ SYSTEM_TABLES: Tuple[str, ...] = (
"manifests",
"files",
"partitions",
+ "buckets",
"tags",
"branches",
)
@@ -66,6 +67,7 @@ SYSTEM_TABLE_LOADERS: Dict[str, Callable[..., "SystemTable"]]
= {
"manifests": _lazy("pypaimon.table.system.manifests_table",
"ManifestsTable"),
"files": _lazy("pypaimon.table.system.files_table", "FilesTable"),
"partitions": _lazy("pypaimon.table.system.partitions_table",
"PartitionsTable"),
+ "buckets": _lazy("pypaimon.table.system.buckets_table", "BucketsTable"),
"tags": _lazy("pypaimon.table.system.tags_table", "TagsTable"),
"branches": _lazy("pypaimon.table.system.branches_table", "BranchesTable"),
}
diff --git a/paimon-python/pypaimon/tests/system/buckets_table_test.py
b/paimon-python/pypaimon/tests/system/buckets_table_test.py
new file mode 100644
index 0000000000..57a352392a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/system/buckets_table_test.py
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""End-to-end tests for the ``$buckets`` system table."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.system.buckets_table import BucketsTable
+
+
+def _read(table):
+ rb = table.new_read_builder()
+ return rb.new_read().to_arrow(rb.new_scan().plan().splits())
+
+
+class BucketsTableTest(unittest.TestCase):
+
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp(prefix="buckets_sys_")
+ warehouse = os.path.join(self.tmp, "warehouse")
+ self.catalog = CatalogFactory.create({"warehouse": warehouse})
+ self.catalog.create_database("db", False)
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp, ignore_errors=True)
+
+ def _create_partitioned_table(self, num_buckets=2):
+ fields = [
+ DataField.from_dict({"id": 0, "name": "id", "type": "INT"}),
+ DataField.from_dict({"id": 1, "name": "v", "type": "STRING"}),
+ DataField.from_dict({"id": 2, "name": "dt", "type": "STRING"}),
+ ]
+ self.catalog.create_table(
+ "db.t",
+ Schema(
+ fields=fields,
+ partition_keys=["dt"],
+ options={"bucket": str(num_buckets)},
+ ),
+ False,
+ )
+
+ def _write_data(self):
+ table = self.catalog.get_table("db.t")
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ commit = write_builder.new_commit()
+ writer.write_arrow(pa.table({
+ "id": pa.array([1, 2, 3, 4], type=pa.int32()),
+ "v": ["a", "b", "c", "d"],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
+ }))
+ commit.commit(writer.prepare_commit())
+ writer.close()
+ commit.close()
+
+ def test_buckets_table_loaded_via_catalog(self):
+ self._create_partitioned_table()
+ table = self.catalog.get_table("db.t$buckets")
+ self.assertIsInstance(table, BucketsTable)
+
+ def test_schema_column_layout(self):
+ self._create_partitioned_table()
+ table = self.catalog.get_table("db.t$buckets")
+ row_type = table.row_type()
+ expected = [
+ ("partition", True), ("bucket", False),
+ ("record_count", False), ("file_size_in_bytes", False),
+ ("file_count", False), ("last_update_time", True),
+ ]
+ self.assertEqual([n for n, _ in expected],
+ [f.name for f in row_type.fields])
+ for field, (_, expected_nullable) in zip(row_type.fields, expected):
+ self.assertEqual(expected_nullable, field.type.nullable,
+ "field {} nullability".format(field.name))
+ self.assertEqual(["partition", "bucket"], table.primary_keys())
+
+ def test_empty_when_no_snapshot_exists(self):
+ self._create_partitioned_table()
+ arrow_table = _read(self.catalog.get_table("db.t$buckets"))
+ self.assertEqual(0, arrow_table.num_rows)
+
+ def test_aggregates_by_partition_and_bucket(self):
+ self._create_partitioned_table(num_buckets=2)
+ self._write_data()
+
+ arrow_table = _read(self.catalog.get_table("db.t$buckets"))
+ self.assertGreater(arrow_table.num_rows, 0)
+
+ partitions = arrow_table.column("partition").to_pylist()
+ self.assertTrue(all(p in ("dt=2024-01-01", "dt=2024-01-02")
+ for p in partitions))
+
+ for size in arrow_table.column("file_size_in_bytes").to_pylist():
+ self.assertGreater(size, 0)
+ for count in arrow_table.column("file_count").to_pylist():
+ self.assertGreaterEqual(count, 1)
+
+ total_records = sum(arrow_table.column("record_count").to_pylist())
+ self.assertEqual(4, total_records)
+
+ def test_rows_sorted_by_partition_then_bucket(self):
+ self._create_partitioned_table(num_buckets=2)
+ self._write_data()
+
+ arrow_table = _read(self.catalog.get_table("db.t$buckets"))
+ partitions = arrow_table.column("partition").to_pylist()
+ buckets = arrow_table.column("bucket").to_pylist()
+
+ rows = list(zip(partitions, buckets))
+ self.assertEqual(rows, sorted(rows))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/system/system_table_loader_test.py
b/paimon-python/pypaimon/tests/system/system_table_loader_test.py
index 1c8690a2bc..07b959a2c6 100644
--- a/paimon-python/pypaimon/tests/system/system_table_loader_test.py
+++ b/paimon-python/pypaimon/tests/system/system_table_loader_test.py
@@ -28,6 +28,7 @@ _EXPECTED_SYSTEM_TABLES = (
"manifests",
"files",
"partitions",
+ "buckets",
"tags",
"branches",
)
@@ -41,7 +42,6 @@ _UNREGISTERED_NAMES = {
"consumers",
"statistics",
"aggregation_fields",
- "buckets",
"file_key_ranges",
"table_indexes",
"row_tracking",