This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2d0781e feat(datafusion): Add $partitions system table (#294)
2d0781e is described below
commit 2d0781e90d0ae907b5b745c44e6baf1e483eb0c2
Author: Jiajia Li <[email protected]>
AuthorDate: Mon May 18 16:12:43 2026 +0800
feat(datafusion): Add $partitions system table (#294)
---
crates/integrations/datafusion/src/sql_context.rs | 2 +-
.../datafusion/src/system_tables/mod.rs | 55 ++++-
.../datafusion/src/system_tables/partitions.rs | 234 +++++++++++++++++++++
.../integrations/datafusion/tests/system_tables.rs | 120 ++++++++++-
crates/paimon/src/catalog/partition_listing.rs | 8 +-
crates/paimon/src/spec/manifest.rs | 75 +++++++
crates/paimon/src/spec/mod.rs | 1 +
docs/src/sql.md | 24 +++
8 files changed, 511 insertions(+), 8 deletions(-)
diff --git a/crates/integrations/datafusion/src/sql_context.rs
b/crates/integrations/datafusion/src/sql_context.rs
index b54f443..973e263 100644
--- a/crates/integrations/datafusion/src/sql_context.rs
+++ b/crates/integrations/datafusion/src/sql_context.rs
@@ -506,7 +506,7 @@ impl SQLContext {
// Sort replacements by position (descending) so that replacements
// from right to left don't shift indices of earlier ones
- replacements.sort_by(|a, b| b.0 .0.cmp(&a.0 .0));
+ replacements.sort_by_key(|r| std::cmp::Reverse(r.0 .0));
// Build the rewritten SQL by replacing each clause from right to left
let mut rewritten_sql = sql.to_string();
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
index 4233667..ddc10ef 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -32,6 +32,7 @@ use crate::error::to_datafusion_error;
mod branches;
mod manifests;
mod options;
+mod partitions;
mod row_string_cast;
mod schemas;
mod snapshots;
@@ -39,6 +40,9 @@ mod tags;
type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
+// Most system tables only need the base `Table`. `partitions` is special-cased
+// in `load` because it needs the catalog handle (for metastore-tracked audit
+// metadata via `Catalog::list_partitions`).
const TABLES: &[(&str, Builder)] = &[
("branches", branches::build),
("manifests", manifests::build),
@@ -48,6 +52,16 @@ const TABLES: &[(&str, Builder)] = &[
("tags", tags::build),
];
+const SYSTEM_TABLE_NAMES: &[&str] = &[
+ "branches",
+ "manifests",
+ "options",
+ "partitions",
+ "schemas",
+ "snapshots",
+ "tags",
+];
+
/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
///
/// Mirrors Java
[Identifier.splitObjectName](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java).
@@ -82,7 +96,9 @@ pub(crate) fn split_object_name(name: &str) -> (&str,
Option<&str>) {
/// Returns true if `name` is a recognised Paimon system table suffix.
pub(crate) fn is_registered(name: &str) -> bool {
- TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n))
+ SYSTEM_TABLE_NAMES
+ .iter()
+ .any(|n| name.eq_ignore_ascii_case(n))
}
/// Wraps an already-loaded base table as the system table `name`.
@@ -110,9 +126,14 @@ pub(crate) async fn load(
}
let identifier = Identifier::new(database, base.clone());
match catalog.get_table(&identifier).await {
- Ok(table) => wrap_to_system_table(&system_name, table)
- .expect("is_registered guarantees a builder")
- .map(Some),
+ Ok(table) => {
+ if system_name.eq_ignore_ascii_case("partitions") {
+ return partitions::build(catalog, identifier, table).map(Some);
+ }
+ wrap_to_system_table(&system_name, table)
+ .expect("is_registered guarantees a builder")
+ .map(Some)
+ }
Err(paimon::Error::TableNotExist { .. }) =>
Err(DataFusionError::Plan(format!(
"Cannot read system table `${system_name}`: \
base table `{base}` does not exist"
@@ -123,7 +144,28 @@ pub(crate) async fn load(
#[cfg(test)]
mod tests {
- use super::{is_registered, split_object_name};
+ use super::{is_registered, split_object_name, SYSTEM_TABLE_NAMES, TABLES};
+
+ /// Guards against the two registries drifting: anything in `TABLES` must
+ /// also be in `SYSTEM_TABLE_NAMES`, and the only name allowed to be in
+ /// `SYSTEM_TABLE_NAMES` but not `TABLES` is `partitions` (routed via the
+ /// special path in `load`).
+ #[test]
+ fn registries_stay_in_sync() {
+ for (name, _) in TABLES {
+ assert!(
+ SYSTEM_TABLE_NAMES.contains(name),
+ "`{name}` is in TABLES but missing from SYSTEM_TABLE_NAMES"
+ );
+ }
+ for name in SYSTEM_TABLE_NAMES {
+ let in_tables = TABLES.iter().any(|(n, _)| n == name);
+ assert!(
+ in_tables || *name == "partitions",
+ "`{name}` is in SYSTEM_TABLE_NAMES but has no builder and is
not the special-cased `partitions`"
+ );
+ }
+ }
#[test]
fn is_registered_is_case_insensitive() {
@@ -142,6 +184,9 @@ mod tests {
assert!(is_registered("manifests"));
assert!(is_registered("Manifests"));
assert!(is_registered("MANIFESTS"));
+ assert!(is_registered("partitions"));
+ assert!(is_registered("Partitions"));
+ assert!(is_registered("PARTITIONS"));
assert!(!is_registered("nonsense"));
}
diff --git a/crates/integrations/datafusion/src/system_tables/partitions.rs
b/crates/integrations/datafusion/src/system_tables/partitions.rs
new file mode 100644
index 0000000..7fa349a
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/partitions.rs
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[PartitionsTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java).
+
+use std::any::Any;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{
+ BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMillisecondArray,
+};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef,
TimeUnit};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::catalog::{Catalog, Identifier};
+use paimon::spec::{CoreOptions, Partition};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(
+ catalog: Arc<dyn Catalog>,
+ identifier: Identifier,
+ table: Table,
+) -> DFResult<Arc<dyn TableProvider>> {
+ let table_schema = table.schema();
+ let partition_keys = table_schema.partition_keys().to_vec();
+ let default_partition_name = CoreOptions::new(table_schema.options())
+ .partition_default_name()
+ .to_string();
+ Ok(Arc::new(PartitionsTable {
+ catalog,
+ identifier,
+ partition_keys,
+ default_partition_name,
+ }))
+}
+
+fn partitions_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("partition", DataType::Utf8, true),
+ Field::new("record_count", DataType::Int64, false),
+ Field::new("file_size_in_bytes", DataType::Int64, false),
+ Field::new("file_count", DataType::Int64, false),
+ Field::new(
+ "last_update_time",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ true,
+ ),
+ Field::new(
+ "created_at",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ true,
+ ),
+ Field::new("created_by", DataType::Utf8, true),
+ Field::new("updated_by", DataType::Utf8, true),
+ Field::new("options", DataType::Utf8, true),
+ Field::new("total_buckets", DataType::Int32, false),
+ Field::new("done", DataType::Boolean, false),
+ ]))
+ })
+ .clone()
+}
+
+struct PartitionsTable {
+ catalog: Arc<dyn Catalog>,
+ identifier: Identifier,
+ partition_keys: Vec<String>,
+ default_partition_name: String,
+}
+
+impl std::fmt::Debug for PartitionsTable {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PartitionsTable")
+ .field("identifier", &self.identifier)
+ .finish()
+ }
+}
+
+#[async_trait]
+impl TableProvider for PartitionsTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ partitions_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let catalog = self.catalog.clone();
+ let identifier = self.identifier.clone();
+ let partitions = crate::runtime::await_with_runtime(async move {
+ catalog.list_partitions(&identifier).await
+ })
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let mut rows: Vec<(String, Partition)> = partitions
+ .into_iter()
+ .map(|p| {
+ let s = format_partition_string(
+ &p.spec,
+ &self.partition_keys,
+ &self.default_partition_name,
+ );
+ (s, p)
+ })
+ .collect();
+ rows.sort_by(|a, b| a.0.cmp(&b.0));
+
+ let n = rows.len();
+ let mut partition_strings: Vec<Option<String>> = Vec::with_capacity(n);
+ let mut record_counts = Vec::with_capacity(n);
+ let mut file_sizes = Vec::with_capacity(n);
+ let mut file_counts = Vec::with_capacity(n);
+ let mut last_update_times: Vec<Option<i64>> = Vec::with_capacity(n);
+ let mut created_ats: Vec<Option<i64>> = Vec::with_capacity(n);
+ let mut created_bys: Vec<Option<String>> = Vec::with_capacity(n);
+ let mut updated_bys: Vec<Option<String>> = Vec::with_capacity(n);
+ let mut options_jsons: Vec<Option<String>> = Vec::with_capacity(n);
+ let mut total_buckets = Vec::with_capacity(n);
+ let mut dones = Vec::with_capacity(n);
+
+ for (s, p) in rows {
+ partition_strings.push(Some(s));
+ record_counts.push(p.record_count);
+ file_sizes.push(p.file_size_in_bytes);
+ file_counts.push(p.file_count);
+ // 0 marks "no creation_time on any file"; real wall-clock is never
+ // <= 0 in practice, so this never nullifies a genuine timestamp.
+ last_update_times.push(if p.last_file_creation_time > 0 {
+ Some(p.last_file_creation_time)
+ } else {
+ None
+ });
+ created_ats.push(p.created_at);
+ created_bys.push(p.created_by);
+ updated_bys.push(p.updated_by);
+ // Sort via BTreeMap so the serialised JSON is deterministic across
+ // runs (Partition.options is a HashMap with unspecified order).
+ options_jsons.push(
+ p.options
+ .as_ref()
+ .map(|m| {
+ let sorted: BTreeMap<&String, &String> =
m.iter().collect();
+ serde_json::to_string(&sorted)
+ .map_err(|e|
DataFusionError::External(Box::new(e)))
+ })
+ .transpose()?,
+ );
+ total_buckets.push(p.total_buckets);
+ dones.push(p.done);
+ }
+
+ let schema = partitions_schema();
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(partition_strings)),
+ Arc::new(Int64Array::from(record_counts)),
+ Arc::new(Int64Array::from(file_sizes)),
+ Arc::new(Int64Array::from(file_counts)),
+ Arc::new(TimestampMillisecondArray::from(last_update_times)),
+ Arc::new(TimestampMillisecondArray::from(created_ats)),
+ Arc::new(StringArray::from(created_bys)),
+ Arc::new(StringArray::from(updated_bys)),
+ Arc::new(StringArray::from(options_jsons)),
+ Arc::new(Int32Array::from(total_buckets)),
+ Arc::new(BooleanArray::from(dones)),
+ ],
+ )?;
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
+
+/// Format `spec` as `key1=val1/key2=val2` in `partition_keys` order. Empty
+/// string for non-partitioned tables. NULL spec values fall back to
+/// `default_partition_name`.
+fn format_partition_string(
+ spec: &HashMap<String, String>,
+ partition_keys: &[String],
+ default_partition_name: &str,
+) -> String {
+ partition_keys
+ .iter()
+ .map(|k| {
+ let v = spec
+ .get(k)
+ .map(String::as_str)
+ .unwrap_or(default_partition_name);
+ format!("{k}={v}")
+ })
+ .collect::<Vec<_>>()
+ .join("/")
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs
b/crates/integrations/datafusion/tests/system_tables.rs
index 9d96e2f..078f18b 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -21,7 +21,7 @@ mod common;
use std::sync::Arc;
-use datafusion::arrow::array::{Array, Int64Array, StringArray};
+use datafusion::arrow::array::{Array, BooleanArray, Int32Array, Int64Array,
StringArray};
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use paimon::catalog::Identifier;
@@ -622,3 +622,121 @@ fn single_int_partition_stat(value: &str) -> i32 {
.parse()
.expect("partition stats should contain one int partition value")
}
+
+#[tokio::test]
+async fn test_partitions_system_table() {
+ let (ctx, _catalog, _tmp) = create_context().await;
+ let sql = format!("SELECT * FROM
paimon.default.{FIXTURE_TABLE}$partitions");
+ let batches = run_sql(&ctx, &sql).await;
+
+ assert!(!batches.is_empty(), "$partitions should return ≥1 batch");
+
+ let arrow_schema = batches[0].schema();
+ let expected_columns = [
+ ("partition", DataType::Utf8),
+ ("record_count", DataType::Int64),
+ ("file_size_in_bytes", DataType::Int64),
+ ("file_count", DataType::Int64),
+ (
+ "last_update_time",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ ),
+ (
+ "created_at",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ ),
+ ("created_by", DataType::Utf8),
+ ("updated_by", DataType::Utf8),
+ ("options", DataType::Utf8),
+ ("total_buckets", DataType::Int32),
+ ("done", DataType::Boolean),
+ ];
+ for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+ let field = arrow_schema.field(i);
+ assert_eq!(field.name(), name, "column {i} name");
+ assert_eq!(field.data_type(), dtype, "column {i} type");
+ }
+
+ let mut partition_strings: Vec<Option<String>> = Vec::new();
+ let mut record_counts: Vec<i64> = Vec::new();
+ let mut file_counts: Vec<i64> = Vec::new();
+ let mut file_sizes: Vec<i64> = Vec::new();
+ for batch in &batches {
+ let parts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("partition is Utf8");
+ let rc = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("record_count is Int64");
+ let sz = batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("file_size_in_bytes is Int64");
+ let fc = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .expect("file_count is Int64");
+ for i in 0..batch.num_rows() {
+ partition_strings.push(if parts.is_null(i) {
+ None
+ } else {
+ Some(parts.value(i).to_string())
+ });
+ record_counts.push(rc.value(i));
+ file_sizes.push(sz.value(i));
+ file_counts.push(fc.value(i));
+ }
+ }
+
+ for fc in &file_counts {
+ assert!(*fc >= 1, "file_count must be ≥ 1 per partition");
+ }
+ for sz in &file_sizes {
+ assert!(*sz >= 0, "file_size_in_bytes must be non-negative");
+ }
+ for rc in &record_counts {
+ assert!(*rc >= 0, "record_count must be non-negative");
+ }
+
+ let mut sorted = partition_strings.clone();
+ sorted.sort();
+ assert_eq!(
+ partition_strings, sorted,
+ "rows should be sorted by partition string"
+ );
+
+ // Cols 5-8 stay null with FileSystemCatalog (no metastore audit metadata).
+ for batch in &batches {
+ for col_idx in 5..=8 {
+ for i in 0..batch.num_rows() {
+ assert!(
+ batch.column(col_idx).is_null(i),
+ "column {col_idx} row {i} should be null"
+ );
+ }
+ }
+ }
+ // total_buckets defaults to 0 and done to false on FileSystemCatalog.
+ for batch in &batches {
+ let tb = batch
+ .column(9)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("total_buckets is Int32");
+ let done = batch
+ .column(10)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .expect("done is Boolean");
+ for i in 0..batch.num_rows() {
+ assert_eq!(tb.value(i), 0, "total_buckets default for FS catalog");
+ assert!(!done.value(i), "done default for FS catalog");
+ }
+ }
+}
diff --git a/crates/paimon/src/catalog/partition_listing.rs
b/crates/paimon/src/catalog/partition_listing.rs
index b71e30d..7652aba 100644
--- a/crates/paimon/src/catalog/partition_listing.rs
+++ b/crates/paimon/src/catalog/partition_listing.rs
@@ -22,7 +22,10 @@
use std::collections::{BTreeMap, HashMap};
-use crate::spec::{BinaryRow, CoreOptions, Manifest, ManifestList, Partition,
PartitionComputer};
+use crate::spec::{
+ merge_active_entries, BinaryRow, CoreOptions, Manifest, ManifestList,
Partition,
+ PartitionComputer,
+};
use crate::table::{SnapshotManager, Table};
use crate::Result;
@@ -49,6 +52,9 @@ pub async fn list_partitions_from_file_system(table: &Table)
-> Result<Vec<Parti
let entries = Manifest::read(file_io, &manifest_path).await?;
all_entries.extend(entries);
}
+ // Mirror Java AbstractFileStoreScan.readAndMergeFileEntries: drop entries
+ // shadowed by a later DELETE so file_count/record_count reflect live
files.
+ let all_entries = merge_active_entries(all_entries);
let schema = table.schema();
let core = CoreOptions::new(schema.options());
diff --git a/crates/paimon/src/spec/manifest.rs
b/crates/paimon/src/spec/manifest.rs
index 8131e7a..cd16133 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -68,6 +68,27 @@ impl Manifest {
}
}
+/// Merge ADD/DELETE entries by file identifier, returning only the active ADD
set.
+/// Mirrors Java
[FileEntry.mergeEntries](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java).
+/// Return order is unspecified.
+pub(crate) fn merge_active_entries(entries: Vec<ManifestEntry>) ->
Vec<ManifestEntry> {
+ use std::collections::HashMap;
+
+ use crate::spec::manifest_entry::Identifier;
+ let mut map: HashMap<Identifier, ManifestEntry> = HashMap::new();
+ for entry in entries {
+ match entry.kind() {
+ FileKind::Add => {
+ map.insert(entry.identifier(), entry);
+ }
+ FileKind::Delete => {
+ map.remove(&entry.identifier());
+ }
+ }
+ }
+ map.into_values().collect()
+}
+
#[cfg(test)]
#[cfg(not(windows))] // Skip on Windows due to path compatibility issues
mod tests {
@@ -96,4 +117,58 @@ mod tests {
assert_eq!(t2.kind(), &FileKind::Add);
assert_eq!(t2.bucket(), 2);
}
+
+ #[test]
+ fn test_merge_active_entries_cancels_add_then_delete() {
+ use crate::spec::data_file::DataFileMeta;
+ use crate::spec::stats::BinaryTableStats;
+ use crate::spec::ManifestEntry;
+
+ fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry
{
+ let stats = BinaryTableStats::new(vec![], vec![], vec![]);
+ let file = DataFileMeta {
+ file_name: file_name.to_string(),
+ file_size: 100,
+ row_count: 10,
+ min_key: vec![],
+ max_key: vec![],
+ key_stats: stats.clone(),
+ value_stats: stats,
+ min_sequence_number: 0,
+ max_sequence_number: 0,
+ schema_id: 0,
+ level,
+ extra_files: vec![],
+ creation_time: None,
+ delete_row_count: None,
+ embedded_index: None,
+ file_source: None,
+ value_stats_cols: None,
+ external_path: None,
+ first_row_id: None,
+ write_cols: None,
+ };
+ ManifestEntry::new(kind, vec![], 0, 1, file, 2)
+ }
+
+ let cancelled = merge_active_entries(vec![
+ entry(FileKind::Add, "f.parquet", 0),
+ entry(FileKind::Delete, "f.parquet", 0),
+ ]);
+ assert!(cancelled.is_empty());
+
+ let two_levels = merge_active_entries(vec![
+ entry(FileKind::Add, "f.parquet", 0),
+ entry(FileKind::Add, "f.parquet", 1),
+ ]);
+ assert_eq!(two_levels.len(), 2);
+
+ let compacted = merge_active_entries(vec![
+ entry(FileKind::Add, "f.parquet", 0),
+ entry(FileKind::Delete, "f.parquet", 0),
+ entry(FileKind::Add, "f.parquet", 1),
+ ]);
+ assert_eq!(compacted.len(), 1);
+ assert_eq!(compacted[0].file().level, 1);
+ }
}
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 270b27f..89de289 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -53,6 +53,7 @@ pub use index_file_meta::*;
mod index_manifest;
pub use index_manifest::{IndexManifest, IndexManifestEntry};
mod manifest;
+pub(crate) use manifest::merge_active_entries;
pub use manifest::Manifest;
mod manifest_common;
pub use manifest_common::FileKind;
diff --git a/docs/src/sql.md b/docs/src/sql.md
index 1d745d5..b075988 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -771,6 +771,30 @@ Columns:
| `min_row_id` | BIGINT | Minimum row id covered (when row tracking is
enabled) |
| `max_row_id` | BIGINT | Maximum row id covered (when row tracking is
enabled) |
+### $partitions
+
+View all partitions of a table with aggregated record counts and file sizes:
+
+```sql
+SELECT * FROM paimon.default.my_table$partitions;
+```
+
+Columns:
+
+| Column | Type | Description |
+|---|---|---|
+| `partition` | STRING | Partition spec, formatted as `key1=val1/key2=val2` |
+| `record_count` | BIGINT | Total record count across all data files in the
partition |
+| `file_size_in_bytes` | BIGINT | Total file size in bytes |
+| `file_count` | BIGINT | Number of data files |
+| `last_update_time` | TIMESTAMP | Latest data-file creation time |
+| `created_at` | TIMESTAMP | Partition creation time (only available with
metastore-tracked catalogs) |
+| `created_by` | STRING | Snapshot id that created the partition
(catalog-tracked only) |
+| `updated_by` | STRING | Snapshot id that last updated the partition
(catalog-tracked only) |
+| `options` | STRING | Per-partition options as flat JSON (catalog-tracked
only) |
+| `total_buckets` | INT | Total bucket count for the partition (0 unless
catalog-tracked) |
+| `done` | BOOLEAN | Whether the partition is marked done (false unless
catalog-tracked) |
+
### Branch References
System tables support branch syntax: