This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b667d893c [python] Add $buckets system table (#7989)
5b667d893c is described below

commit 5b667d893c7c2b800cbf3f301b5a86745038cd2f
Author: Junrui Lee <[email protected]>
AuthorDate: Thu May 28 16:54:36 2026 +0800

    [python] Add $buckets system table (#7989)
---
 .../pypaimon/table/system/buckets_table.py         | 165 +++++++++++++++++++++
 .../pypaimon/table/system/system_table_loader.py   |   4 +-
 .../pypaimon/tests/system/buckets_table_test.py    | 136 +++++++++++++++++
 .../tests/system/system_table_loader_test.py       |   2 +-
 4 files changed, 305 insertions(+), 2 deletions(-)

diff --git a/paimon-python/pypaimon/table/system/buckets_table.py 
b/paimon-python/pypaimon/table/system/buckets_table.py
new file mode 100644
index 0000000000..ddd5e8c6c6
--- /dev/null
+++ b/paimon-python/pypaimon/table/system/buckets_table.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""The ``$buckets`` system table — per-bucket aggregated stats."""
+
+from typing import List, Optional
+
+import pyarrow
+
+from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.schema.data_types import AtomicType, DataField, RowType
+from pypaimon.table.system.system_table import SystemTable
+
+
+TABLE_TYPE = RowType(False, [
+    DataField(0, "partition", AtomicType("STRING", nullable=True)),
+    DataField(1, "bucket", AtomicType("INT", nullable=False)),
+    DataField(2, "record_count", AtomicType("BIGINT", nullable=False)),
+    DataField(3, "file_size_in_bytes", AtomicType("BIGINT", nullable=False)),
+    DataField(4, "file_count", AtomicType("BIGINT", nullable=False)),
+    DataField(5, "last_update_time", AtomicType("TIMESTAMP(3)", 
nullable=True)),
+])
+
+
+_TIMESTAMP_TYPE = pyarrow.timestamp("ms")
+
+
+class BucketsTable(SystemTable):
+    """The ``$buckets`` system table.
+
+    Aggregates manifest entries by (partition, bucket) to show per-bucket
+    record counts, file sizes, file counts and last update times.
+    """
+
+    def system_table_name(self) -> str:
+        return "buckets"
+
+    def row_type(self) -> RowType:
+        return TABLE_TYPE
+
+    def primary_keys(self) -> List[str]:
+        return ["partition", "bucket"]
+
+    def _build_arrow_table(self) -> pyarrow.Table:
+        snapshot = self.base_table.snapshot_manager().get_latest_snapshot()
+        if snapshot is None:
+            return self._empty_table()
+
+        manifest_list_manager = ManifestListManager(self.base_table)
+        manifest_files = manifest_list_manager.read_all(snapshot)
+        manifest_file_manager = ManifestFileManager(self.base_table)
+        entries = manifest_file_manager.read_entries_parallel(
+            manifest_files, drop_stats=True)
+
+        _NULL = object()
+
+        bucket_map: dict = {}
+        for entry in entries:
+            raw_key = tuple(
+                (field.name, _NULL if value is None else value)
+                for field, value in zip(
+                    entry.partition.fields, entry.partition.values))
+            bucket_id = int(entry.bucket)
+            key = (raw_key, bucket_id)
+
+            stats = bucket_map.get(key)
+            if stats is None:
+                render_items = tuple(
+                    (name, str(val) if val is not _NULL else None)
+                    for name, val in raw_key)
+                stats = {
+                    "render_items": render_items,
+                    "bucket": bucket_id,
+                    "record_count": 0,
+                    "file_size_in_bytes": 0,
+                    "file_count": 0,
+                    "last_update_time": None,
+                }
+                bucket_map[key] = stats
+
+            stats["record_count"] += int(entry.file.row_count)
+            stats["file_size_in_bytes"] += int(entry.file.file_size)
+            stats["file_count"] += 1
+            ct_ms = entry.file.creation_time_epoch_millis()
+            if ct_ms is not None:
+                if (stats["last_update_time"] is None
+                        or ct_ms > stats["last_update_time"]):
+                    stats["last_update_time"] = ct_ms
+
+        sorted_keys = sorted(
+            bucket_map.keys(),
+            key=lambda k: (
+                _render_partition(bucket_map[k]["render_items"]) or "",
+                k[1]))
+
+        partition_strings: List[Optional[str]] = []
+        buckets: List[int] = []
+        record_counts: List[int] = []
+        file_sizes: List[int] = []
+        file_counts: List[int] = []
+        last_update_times: List[Optional[int]] = []
+
+        for key in sorted_keys:
+            stats = bucket_map[key]
+            partition_strings.append(_render_partition(stats["render_items"]))
+            buckets.append(stats["bucket"])
+            record_counts.append(stats["record_count"])
+            file_sizes.append(stats["file_size_in_bytes"])
+            file_counts.append(stats["file_count"])
+            last_update_times.append(stats["last_update_time"])
+
+        return pyarrow.table({
+            "partition": pyarrow.array(
+                partition_strings, type=pyarrow.string()),
+            "bucket": pyarrow.array(buckets, type=pyarrow.int32()),
+            "record_count": pyarrow.array(
+                record_counts, type=pyarrow.int64()),
+            "file_size_in_bytes": pyarrow.array(
+                file_sizes, type=pyarrow.int64()),
+            "file_count": pyarrow.array(file_counts, type=pyarrow.int64()),
+            "last_update_time": pyarrow.array(
+                last_update_times, type=_TIMESTAMP_TYPE),
+        })
+
+    @staticmethod
+    def _empty_table() -> pyarrow.Table:
+        return pyarrow.table({
+            "partition": pyarrow.array([], type=pyarrow.string()),
+            "bucket": pyarrow.array([], type=pyarrow.int32()),
+            "record_count": pyarrow.array([], type=pyarrow.int64()),
+            "file_size_in_bytes": pyarrow.array([], type=pyarrow.int64()),
+            "file_count": pyarrow.array([], type=pyarrow.int64()),
+            "last_update_time": pyarrow.array([], type=_TIMESTAMP_TYPE),
+        })
+
+
+def _render_partition(spec_items) -> Optional[str]:
+    """Render a partition spec as ``pt=v/pt2=v2`` or None when empty.
+
+    Null partition values are rendered as ``__NULL__`` to distinguish them
+    from the literal string ``"None"``.  A partition whose value is
+    literally ``"__NULL__"`` will produce the same rendered string —
+    aggregation keys are still distinct, but the displayed partition
+    column will collide.  This is a display-only limitation.
+    """
+    if not spec_items:
+        return None
+    return "/".join(
+        "{}={}".format(name, "__NULL__" if value is None else value)
+        for name, value in spec_items)
diff --git a/paimon-python/pypaimon/table/system/system_table_loader.py 
b/paimon-python/pypaimon/table/system/system_table_loader.py
index 9d0576aec1..72b758947d 100644
--- a/paimon-python/pypaimon/table/system/system_table_loader.py
+++ b/paimon-python/pypaimon/table/system/system_table_loader.py
@@ -24,7 +24,7 @@ new module.
 The following short names are intentionally not registered here yet:
 
   audit_log, binlog, read_optimized, consumers, statistics,
-  aggregation_fields, buckets, file_key_ranges, table_indexes,
+  aggregation_fields, file_key_ranges, table_indexes,
   row_tracking, all_tables, all_partitions, all_table_options,
   catalog_options
 """
@@ -44,6 +44,7 @@ SYSTEM_TABLES: Tuple[str, ...] = (
     "manifests",
     "files",
     "partitions",
+    "buckets",
     "tags",
     "branches",
 )
@@ -66,6 +67,7 @@ SYSTEM_TABLE_LOADERS: Dict[str, Callable[..., "SystemTable"]] 
= {
     "manifests": _lazy("pypaimon.table.system.manifests_table", 
"ManifestsTable"),
     "files": _lazy("pypaimon.table.system.files_table", "FilesTable"),
     "partitions": _lazy("pypaimon.table.system.partitions_table", 
"PartitionsTable"),
+    "buckets": _lazy("pypaimon.table.system.buckets_table", "BucketsTable"),
     "tags": _lazy("pypaimon.table.system.tags_table", "TagsTable"),
     "branches": _lazy("pypaimon.table.system.branches_table", "BranchesTable"),
 }
diff --git a/paimon-python/pypaimon/tests/system/buckets_table_test.py 
b/paimon-python/pypaimon/tests/system/buckets_table_test.py
new file mode 100644
index 0000000000..57a352392a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/system/buckets_table_test.py
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""End-to-end tests for the ``$buckets`` system table."""
+
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.system.buckets_table import BucketsTable
+
+
+def _read(table):
+    rb = table.new_read_builder()
+    return rb.new_read().to_arrow(rb.new_scan().plan().splits())
+
+
+class BucketsTableTest(unittest.TestCase):
+
+    def setUp(self):
+        self.tmp = tempfile.mkdtemp(prefix="buckets_sys_")
+        warehouse = os.path.join(self.tmp, "warehouse")
+        self.catalog = CatalogFactory.create({"warehouse": warehouse})
+        self.catalog.create_database("db", False)
+
+    def tearDown(self):
+        shutil.rmtree(self.tmp, ignore_errors=True)
+
+    def _create_partitioned_table(self, num_buckets=2):
+        fields = [
+            DataField.from_dict({"id": 0, "name": "id", "type": "INT"}),
+            DataField.from_dict({"id": 1, "name": "v", "type": "STRING"}),
+            DataField.from_dict({"id": 2, "name": "dt", "type": "STRING"}),
+        ]
+        self.catalog.create_table(
+            "db.t",
+            Schema(
+                fields=fields,
+                partition_keys=["dt"],
+                options={"bucket": str(num_buckets)},
+            ),
+            False,
+        )
+
+    def _write_data(self):
+        table = self.catalog.get_table("db.t")
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        commit = write_builder.new_commit()
+        writer.write_arrow(pa.table({
+            "id": pa.array([1, 2, 3, 4], type=pa.int32()),
+            "v": ["a", "b", "c", "d"],
+            "dt": ["2024-01-01", "2024-01-01", "2024-01-02", "2024-01-02"],
+        }))
+        commit.commit(writer.prepare_commit())
+        writer.close()
+        commit.close()
+
+    def test_buckets_table_loaded_via_catalog(self):
+        self._create_partitioned_table()
+        table = self.catalog.get_table("db.t$buckets")
+        self.assertIsInstance(table, BucketsTable)
+
+    def test_schema_column_layout(self):
+        self._create_partitioned_table()
+        table = self.catalog.get_table("db.t$buckets")
+        row_type = table.row_type()
+        expected = [
+            ("partition", True), ("bucket", False),
+            ("record_count", False), ("file_size_in_bytes", False),
+            ("file_count", False), ("last_update_time", True),
+        ]
+        self.assertEqual([n for n, _ in expected],
+                         [f.name for f in row_type.fields])
+        for field, (_, expected_nullable) in zip(row_type.fields, expected):
+            self.assertEqual(expected_nullable, field.type.nullable,
+                             "field {} nullability".format(field.name))
+        self.assertEqual(["partition", "bucket"], table.primary_keys())
+
+    def test_empty_when_no_snapshot_exists(self):
+        self._create_partitioned_table()
+        arrow_table = _read(self.catalog.get_table("db.t$buckets"))
+        self.assertEqual(0, arrow_table.num_rows)
+
+    def test_aggregates_by_partition_and_bucket(self):
+        self._create_partitioned_table(num_buckets=2)
+        self._write_data()
+
+        arrow_table = _read(self.catalog.get_table("db.t$buckets"))
+        self.assertGreater(arrow_table.num_rows, 0)
+
+        partitions = arrow_table.column("partition").to_pylist()
+        self.assertTrue(all(p in ("dt=2024-01-01", "dt=2024-01-02")
+                            for p in partitions))
+
+        for size in arrow_table.column("file_size_in_bytes").to_pylist():
+            self.assertGreater(size, 0)
+        for count in arrow_table.column("file_count").to_pylist():
+            self.assertGreaterEqual(count, 1)
+
+        total_records = sum(arrow_table.column("record_count").to_pylist())
+        self.assertEqual(4, total_records)
+
+    def test_rows_sorted_by_partition_then_bucket(self):
+        self._create_partitioned_table(num_buckets=2)
+        self._write_data()
+
+        arrow_table = _read(self.catalog.get_table("db.t$buckets"))
+        partitions = arrow_table.column("partition").to_pylist()
+        buckets = arrow_table.column("bucket").to_pylist()
+
+        rows = list(zip(partitions, buckets))
+        self.assertEqual(rows, sorted(rows))
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/system/system_table_loader_test.py 
b/paimon-python/pypaimon/tests/system/system_table_loader_test.py
index 1c8690a2bc..07b959a2c6 100644
--- a/paimon-python/pypaimon/tests/system/system_table_loader_test.py
+++ b/paimon-python/pypaimon/tests/system/system_table_loader_test.py
@@ -28,6 +28,7 @@ _EXPECTED_SYSTEM_TABLES = (
     "manifests",
     "files",
     "partitions",
+    "buckets",
     "tags",
     "branches",
 )
@@ -41,7 +42,6 @@ _UNREGISTERED_NAMES = {
     "consumers",
     "statistics",
     "aggregation_fields",
-    "buckets",
     "file_key_ranges",
     "table_indexes",
     "row_tracking",

Reply via email to