This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d0781e  feat(datafusion): Add $partitions system table (#294)
2d0781e is described below

commit 2d0781e90d0ae907b5b745c44e6baf1e483eb0c2
Author: Jiajia Li <[email protected]>
AuthorDate: Mon May 18 16:12:43 2026 +0800

    feat(datafusion): Add $partitions system table (#294)
---
 crates/integrations/datafusion/src/sql_context.rs  |   2 +-
 .../datafusion/src/system_tables/mod.rs            |  55 ++++-
 .../datafusion/src/system_tables/partitions.rs     | 234 +++++++++++++++++++++
 .../integrations/datafusion/tests/system_tables.rs | 120 ++++++++++-
 crates/paimon/src/catalog/partition_listing.rs     |   8 +-
 crates/paimon/src/spec/manifest.rs                 |  75 +++++++
 crates/paimon/src/spec/mod.rs                      |   1 +
 docs/src/sql.md                                    |  24 +++
 8 files changed, 511 insertions(+), 8 deletions(-)

diff --git a/crates/integrations/datafusion/src/sql_context.rs 
b/crates/integrations/datafusion/src/sql_context.rs
index b54f443..973e263 100644
--- a/crates/integrations/datafusion/src/sql_context.rs
+++ b/crates/integrations/datafusion/src/sql_context.rs
@@ -506,7 +506,7 @@ impl SQLContext {
 
         // Sort replacements by position (descending) so that replacements
         // from right to left don't shift indices of earlier ones
-        replacements.sort_by(|a, b| b.0 .0.cmp(&a.0 .0));
+        replacements.sort_by_key(|r| std::cmp::Reverse(r.0 .0));
 
         // Build the rewritten SQL by replacing each clause from right to left
         let mut rewritten_sql = sql.to_string();
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs 
b/crates/integrations/datafusion/src/system_tables/mod.rs
index 4233667..ddc10ef 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -32,6 +32,7 @@ use crate::error::to_datafusion_error;
 mod branches;
 mod manifests;
 mod options;
+mod partitions;
 mod row_string_cast;
 mod schemas;
 mod snapshots;
@@ -39,6 +40,9 @@ mod tags;
 
 type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
 
+// Most system tables only need the base `Table`. `partitions` is special-cased
+// in `load` because it needs the catalog handle (for metastore-tracked audit
+// metadata via `Catalog::list_partitions`).
 const TABLES: &[(&str, Builder)] = &[
     ("branches", branches::build),
     ("manifests", manifests::build),
@@ -48,6 +52,16 @@ const TABLES: &[(&str, Builder)] = &[
     ("tags", tags::build),
 ];
 
+const SYSTEM_TABLE_NAMES: &[&str] = &[
+    "branches",
+    "manifests",
+    "options",
+    "partitions",
+    "schemas",
+    "snapshots",
+    "tags",
+];
+
 /// Parse a Paimon object name into `(base_table, optional system_table_name)`.
 ///
 /// Mirrors Java 
[Identifier.splitObjectName](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java).
@@ -82,7 +96,9 @@ pub(crate) fn split_object_name(name: &str) -> (&str, 
Option<&str>) {
 
 /// Returns true if `name` is a recognised Paimon system table suffix.
 pub(crate) fn is_registered(name: &str) -> bool {
-    TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n))
+    SYSTEM_TABLE_NAMES
+        .iter()
+        .any(|n| name.eq_ignore_ascii_case(n))
 }
 
 /// Wraps an already-loaded base table as the system table `name`.
@@ -110,9 +126,14 @@ pub(crate) async fn load(
     }
     let identifier = Identifier::new(database, base.clone());
     match catalog.get_table(&identifier).await {
-        Ok(table) => wrap_to_system_table(&system_name, table)
-            .expect("is_registered guarantees a builder")
-            .map(Some),
+        Ok(table) => {
+            if system_name.eq_ignore_ascii_case("partitions") {
+                return partitions::build(catalog, identifier, table).map(Some);
+            }
+            wrap_to_system_table(&system_name, table)
+                .expect("is_registered guarantees a builder")
+                .map(Some)
+        }
         Err(paimon::Error::TableNotExist { .. }) => 
Err(DataFusionError::Plan(format!(
             "Cannot read system table `${system_name}`: \
              base table `{base}` does not exist"
@@ -123,7 +144,28 @@ pub(crate) async fn load(
 
 #[cfg(test)]
 mod tests {
-    use super::{is_registered, split_object_name};
+    use super::{is_registered, split_object_name, SYSTEM_TABLE_NAMES, TABLES};
+
+    /// Guards against the two registries drifting: anything in `TABLES` must
+    /// also be in `SYSTEM_TABLE_NAMES`, and the only name allowed to be in
+    /// `SYSTEM_TABLE_NAMES` but not `TABLES` is `partitions` (routed via the
+    /// special path in `load`).
+    #[test]
+    fn registries_stay_in_sync() {
+        for (name, _) in TABLES {
+            assert!(
+                SYSTEM_TABLE_NAMES.contains(name),
+                "`{name}` is in TABLES but missing from SYSTEM_TABLE_NAMES"
+            );
+        }
+        for name in SYSTEM_TABLE_NAMES {
+            let in_tables = TABLES.iter().any(|(n, _)| n == name);
+            assert!(
+                in_tables || *name == "partitions",
+                "`{name}` is in SYSTEM_TABLE_NAMES but has no builder and is 
not the special-cased `partitions`"
+            );
+        }
+    }
 
     #[test]
     fn is_registered_is_case_insensitive() {
@@ -142,6 +184,9 @@ mod tests {
         assert!(is_registered("manifests"));
         assert!(is_registered("Manifests"));
         assert!(is_registered("MANIFESTS"));
+        assert!(is_registered("partitions"));
+        assert!(is_registered("Partitions"));
+        assert!(is_registered("PARTITIONS"));
         assert!(!is_registered("nonsense"));
     }
 
diff --git a/crates/integrations/datafusion/src/system_tables/partitions.rs 
b/crates/integrations/datafusion/src/system_tables/partitions.rs
new file mode 100644
index 0000000..7fa349a
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/partitions.rs
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java 
[PartitionsTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java).
+
+use std::any::Any;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{
+    BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray, 
TimestampMillisecondArray,
+};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, 
TimeUnit};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::{DataFusionError, Result as DFResult};
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::catalog::{Catalog, Identifier};
+use paimon::spec::{CoreOptions, Partition};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(
+    catalog: Arc<dyn Catalog>,
+    identifier: Identifier,
+    table: Table,
+) -> DFResult<Arc<dyn TableProvider>> {
+    let table_schema = table.schema();
+    let partition_keys = table_schema.partition_keys().to_vec();
+    let default_partition_name = CoreOptions::new(table_schema.options())
+        .partition_default_name()
+        .to_string();
+    Ok(Arc::new(PartitionsTable {
+        catalog,
+        identifier,
+        partition_keys,
+        default_partition_name,
+    }))
+}
+
+fn partitions_schema() -> SchemaRef {
+    static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+    SCHEMA
+        .get_or_init(|| {
+            Arc::new(Schema::new(vec![
+                Field::new("partition", DataType::Utf8, true),
+                Field::new("record_count", DataType::Int64, false),
+                Field::new("file_size_in_bytes", DataType::Int64, false),
+                Field::new("file_count", DataType::Int64, false),
+                Field::new(
+                    "last_update_time",
+                    DataType::Timestamp(TimeUnit::Millisecond, None),
+                    true,
+                ),
+                Field::new(
+                    "created_at",
+                    DataType::Timestamp(TimeUnit::Millisecond, None),
+                    true,
+                ),
+                Field::new("created_by", DataType::Utf8, true),
+                Field::new("updated_by", DataType::Utf8, true),
+                Field::new("options", DataType::Utf8, true),
+                Field::new("total_buckets", DataType::Int32, false),
+                Field::new("done", DataType::Boolean, false),
+            ]))
+        })
+        .clone()
+}
+
+struct PartitionsTable {
+    catalog: Arc<dyn Catalog>,
+    identifier: Identifier,
+    partition_keys: Vec<String>,
+    default_partition_name: String,
+}
+
+impl std::fmt::Debug for PartitionsTable {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PartitionsTable")
+            .field("identifier", &self.identifier)
+            .finish()
+    }
+}
+
+#[async_trait]
+impl TableProvider for PartitionsTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        partitions_schema()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        let catalog = self.catalog.clone();
+        let identifier = self.identifier.clone();
+        let partitions = crate::runtime::await_with_runtime(async move {
+            catalog.list_partitions(&identifier).await
+        })
+        .await
+        .map_err(to_datafusion_error)?;
+
+        let mut rows: Vec<(String, Partition)> = partitions
+            .into_iter()
+            .map(|p| {
+                let s = format_partition_string(
+                    &p.spec,
+                    &self.partition_keys,
+                    &self.default_partition_name,
+                );
+                (s, p)
+            })
+            .collect();
+        rows.sort_by(|a, b| a.0.cmp(&b.0));
+
+        let n = rows.len();
+        let mut partition_strings: Vec<Option<String>> = Vec::with_capacity(n);
+        let mut record_counts = Vec::with_capacity(n);
+        let mut file_sizes = Vec::with_capacity(n);
+        let mut file_counts = Vec::with_capacity(n);
+        let mut last_update_times: Vec<Option<i64>> = Vec::with_capacity(n);
+        let mut created_ats: Vec<Option<i64>> = Vec::with_capacity(n);
+        let mut created_bys: Vec<Option<String>> = Vec::with_capacity(n);
+        let mut updated_bys: Vec<Option<String>> = Vec::with_capacity(n);
+        let mut options_jsons: Vec<Option<String>> = Vec::with_capacity(n);
+        let mut total_buckets = Vec::with_capacity(n);
+        let mut dones = Vec::with_capacity(n);
+
+        for (s, p) in rows {
+            partition_strings.push(Some(s));
+            record_counts.push(p.record_count);
+            file_sizes.push(p.file_size_in_bytes);
+            file_counts.push(p.file_count);
+            // 0 marks "no creation_time on any file"; real wall-clock is never
+            // <= 0 in practice, so this never nullifies a genuine timestamp.
+            last_update_times.push(if p.last_file_creation_time > 0 {
+                Some(p.last_file_creation_time)
+            } else {
+                None
+            });
+            created_ats.push(p.created_at);
+            created_bys.push(p.created_by);
+            updated_bys.push(p.updated_by);
+            // Sort via BTreeMap so the serialised JSON is deterministic across
+            // runs (Partition.options is a HashMap with unspecified order).
+            options_jsons.push(
+                p.options
+                    .as_ref()
+                    .map(|m| {
+                        let sorted: BTreeMap<&String, &String> = 
m.iter().collect();
+                        serde_json::to_string(&sorted)
+                            .map_err(|e| 
DataFusionError::External(Box::new(e)))
+                    })
+                    .transpose()?,
+            );
+            total_buckets.push(p.total_buckets);
+            dones.push(p.done);
+        }
+
+        let schema = partitions_schema();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(StringArray::from(partition_strings)),
+                Arc::new(Int64Array::from(record_counts)),
+                Arc::new(Int64Array::from(file_sizes)),
+                Arc::new(Int64Array::from(file_counts)),
+                Arc::new(TimestampMillisecondArray::from(last_update_times)),
+                Arc::new(TimestampMillisecondArray::from(created_ats)),
+                Arc::new(StringArray::from(created_bys)),
+                Arc::new(StringArray::from(updated_bys)),
+                Arc::new(StringArray::from(options_jsons)),
+                Arc::new(Int32Array::from(total_buckets)),
+                Arc::new(BooleanArray::from(dones)),
+            ],
+        )?;
+
+        Ok(MemorySourceConfig::try_new_exec(
+            &[vec![batch]],
+            schema,
+            projection.cloned(),
+        )?)
+    }
+}
+
+/// Format `spec` as `key1=val1/key2=val2` in `partition_keys` order. Empty
+/// string for non-partitioned tables. NULL spec values fall back to
+/// `default_partition_name`.
+fn format_partition_string(
+    spec: &HashMap<String, String>,
+    partition_keys: &[String],
+    default_partition_name: &str,
+) -> String {
+    partition_keys
+        .iter()
+        .map(|k| {
+            let v = spec
+                .get(k)
+                .map(String::as_str)
+                .unwrap_or(default_partition_name);
+            format!("{k}={v}")
+        })
+        .collect::<Vec<_>>()
+        .join("/")
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs 
b/crates/integrations/datafusion/tests/system_tables.rs
index 9d96e2f..078f18b 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -21,7 +21,7 @@ mod common;
 
 use std::sync::Arc;
 
-use datafusion::arrow::array::{Array, Int64Array, StringArray};
+use datafusion::arrow::array::{Array, BooleanArray, Int32Array, Int64Array, 
StringArray};
 use datafusion::arrow::datatypes::{DataType, TimeUnit};
 use datafusion::arrow::record_batch::RecordBatch;
 use paimon::catalog::Identifier;
@@ -622,3 +622,121 @@ fn single_int_partition_stat(value: &str) -> i32 {
         .parse()
         .expect("partition stats should contain one int partition value")
 }
+
+#[tokio::test]
+async fn test_partitions_system_table() {
+    let (ctx, _catalog, _tmp) = create_context().await;
+    let sql = format!("SELECT * FROM 
paimon.default.{FIXTURE_TABLE}$partitions");
+    let batches = run_sql(&ctx, &sql).await;
+
+    assert!(!batches.is_empty(), "$partitions should return ≥1 batch");
+
+    let arrow_schema = batches[0].schema();
+    let expected_columns = [
+        ("partition", DataType::Utf8),
+        ("record_count", DataType::Int64),
+        ("file_size_in_bytes", DataType::Int64),
+        ("file_count", DataType::Int64),
+        (
+            "last_update_time",
+            DataType::Timestamp(TimeUnit::Millisecond, None),
+        ),
+        (
+            "created_at",
+            DataType::Timestamp(TimeUnit::Millisecond, None),
+        ),
+        ("created_by", DataType::Utf8),
+        ("updated_by", DataType::Utf8),
+        ("options", DataType::Utf8),
+        ("total_buckets", DataType::Int32),
+        ("done", DataType::Boolean),
+    ];
+    for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+        let field = arrow_schema.field(i);
+        assert_eq!(field.name(), name, "column {i} name");
+        assert_eq!(field.data_type(), dtype, "column {i} type");
+    }
+
+    let mut partition_strings: Vec<Option<String>> = Vec::new();
+    let mut record_counts: Vec<i64> = Vec::new();
+    let mut file_counts: Vec<i64> = Vec::new();
+    let mut file_sizes: Vec<i64> = Vec::new();
+    for batch in &batches {
+        let parts = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .expect("partition is Utf8");
+        let rc = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("record_count is Int64");
+        let sz = batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("file_size_in_bytes is Int64");
+        let fc = batch
+            .column(3)
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .expect("file_count is Int64");
+        for i in 0..batch.num_rows() {
+            partition_strings.push(if parts.is_null(i) {
+                None
+            } else {
+                Some(parts.value(i).to_string())
+            });
+            record_counts.push(rc.value(i));
+            file_sizes.push(sz.value(i));
+            file_counts.push(fc.value(i));
+        }
+    }
+
+    for fc in &file_counts {
+        assert!(*fc >= 1, "file_count must be ≥ 1 per partition");
+    }
+    for sz in &file_sizes {
+        assert!(*sz >= 0, "file_size_in_bytes must be non-negative");
+    }
+    for rc in &record_counts {
+        assert!(*rc >= 0, "record_count must be non-negative");
+    }
+
+    let mut sorted = partition_strings.clone();
+    sorted.sort();
+    assert_eq!(
+        partition_strings, sorted,
+        "rows should be sorted by partition string"
+    );
+
+    // Cols 5-8 stay null with FileSystemCatalog (no metastore audit metadata).
+    for batch in &batches {
+        for col_idx in 5..=8 {
+            for i in 0..batch.num_rows() {
+                assert!(
+                    batch.column(col_idx).is_null(i),
+                    "column {col_idx} row {i} should be null"
+                );
+            }
+        }
+    }
+    // total_buckets defaults to 0 and done to false on FileSystemCatalog.
+    for batch in &batches {
+        let tb = batch
+            .column(9)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .expect("total_buckets is Int32");
+        let done = batch
+            .column(10)
+            .as_any()
+            .downcast_ref::<BooleanArray>()
+            .expect("done is Boolean");
+        for i in 0..batch.num_rows() {
+            assert_eq!(tb.value(i), 0, "total_buckets default for FS catalog");
+            assert!(!done.value(i), "done default for FS catalog");
+        }
+    }
+}
diff --git a/crates/paimon/src/catalog/partition_listing.rs 
b/crates/paimon/src/catalog/partition_listing.rs
index b71e30d..7652aba 100644
--- a/crates/paimon/src/catalog/partition_listing.rs
+++ b/crates/paimon/src/catalog/partition_listing.rs
@@ -22,7 +22,10 @@
 
 use std::collections::{BTreeMap, HashMap};
 
-use crate::spec::{BinaryRow, CoreOptions, Manifest, ManifestList, Partition, 
PartitionComputer};
+use crate::spec::{
+    merge_active_entries, BinaryRow, CoreOptions, Manifest, ManifestList, 
Partition,
+    PartitionComputer,
+};
 use crate::table::{SnapshotManager, Table};
 use crate::Result;
 
@@ -49,6 +52,9 @@ pub async fn list_partitions_from_file_system(table: &Table) 
-> Result<Vec<Parti
         let entries = Manifest::read(file_io, &manifest_path).await?;
         all_entries.extend(entries);
     }
+    // Mirror Java AbstractFileStoreScan.readAndMergeFileEntries: drop entries
+    // shadowed by a later DELETE so file_count/record_count reflect live 
files.
+    let all_entries = merge_active_entries(all_entries);
 
     let schema = table.schema();
     let core = CoreOptions::new(schema.options());
diff --git a/crates/paimon/src/spec/manifest.rs 
b/crates/paimon/src/spec/manifest.rs
index 8131e7a..cd16133 100644
--- a/crates/paimon/src/spec/manifest.rs
+++ b/crates/paimon/src/spec/manifest.rs
@@ -68,6 +68,27 @@ impl Manifest {
     }
 }
 
+/// Merge ADD/DELETE entries by file identifier, returning only the active ADD 
set.
+/// Mirrors Java 
[FileEntry.mergeEntries](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java).
+/// Return order is unspecified.
+pub(crate) fn merge_active_entries(entries: Vec<ManifestEntry>) -> 
Vec<ManifestEntry> {
+    use std::collections::HashMap;
+
+    use crate::spec::manifest_entry::Identifier;
+    let mut map: HashMap<Identifier, ManifestEntry> = HashMap::new();
+    for entry in entries {
+        match entry.kind() {
+            FileKind::Add => {
+                map.insert(entry.identifier(), entry);
+            }
+            FileKind::Delete => {
+                map.remove(&entry.identifier());
+            }
+        }
+    }
+    map.into_values().collect()
+}
+
 #[cfg(test)]
 #[cfg(not(windows))] // Skip on Windows due to path compatibility issues
 mod tests {
@@ -96,4 +117,58 @@ mod tests {
         assert_eq!(t2.kind(), &FileKind::Add);
         assert_eq!(t2.bucket(), 2);
     }
+
+    #[test]
+    fn test_merge_active_entries_cancels_add_then_delete() {
+        use crate::spec::data_file::DataFileMeta;
+        use crate::spec::stats::BinaryTableStats;
+        use crate::spec::ManifestEntry;
+
+        fn entry(kind: FileKind, file_name: &str, level: i32) -> ManifestEntry 
{
+            let stats = BinaryTableStats::new(vec![], vec![], vec![]);
+            let file = DataFileMeta {
+                file_name: file_name.to_string(),
+                file_size: 100,
+                row_count: 10,
+                min_key: vec![],
+                max_key: vec![],
+                key_stats: stats.clone(),
+                value_stats: stats,
+                min_sequence_number: 0,
+                max_sequence_number: 0,
+                schema_id: 0,
+                level,
+                extra_files: vec![],
+                creation_time: None,
+                delete_row_count: None,
+                embedded_index: None,
+                file_source: None,
+                value_stats_cols: None,
+                external_path: None,
+                first_row_id: None,
+                write_cols: None,
+            };
+            ManifestEntry::new(kind, vec![], 0, 1, file, 2)
+        }
+
+        let cancelled = merge_active_entries(vec![
+            entry(FileKind::Add, "f.parquet", 0),
+            entry(FileKind::Delete, "f.parquet", 0),
+        ]);
+        assert!(cancelled.is_empty());
+
+        let two_levels = merge_active_entries(vec![
+            entry(FileKind::Add, "f.parquet", 0),
+            entry(FileKind::Add, "f.parquet", 1),
+        ]);
+        assert_eq!(two_levels.len(), 2);
+
+        let compacted = merge_active_entries(vec![
+            entry(FileKind::Add, "f.parquet", 0),
+            entry(FileKind::Delete, "f.parquet", 0),
+            entry(FileKind::Add, "f.parquet", 1),
+        ]);
+        assert_eq!(compacted.len(), 1);
+        assert_eq!(compacted[0].file().level, 1);
+    }
 }
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 270b27f..89de289 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -53,6 +53,7 @@ pub use index_file_meta::*;
 mod index_manifest;
 pub use index_manifest::{IndexManifest, IndexManifestEntry};
 mod manifest;
+pub(crate) use manifest::merge_active_entries;
 pub use manifest::Manifest;
 mod manifest_common;
 pub use manifest_common::FileKind;
diff --git a/docs/src/sql.md b/docs/src/sql.md
index 1d745d5..b075988 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -771,6 +771,30 @@ Columns:
 | `min_row_id` | BIGINT | Minimum row id covered (when row tracking is 
enabled) |
 | `max_row_id` | BIGINT | Maximum row id covered (when row tracking is 
enabled) |
 
+### $partitions
+
+View all partitions of a table with aggregated record counts and file sizes:
+
+```sql
+SELECT * FROM paimon.default.my_table$partitions;
+```
+
+Columns:
+
+| Column | Type | Description |
+|---|---|---|
+| `partition` | STRING | Partition spec, formatted as `key1=val1/key2=val2` |
+| `record_count` | BIGINT | Total record count across all data files in the 
partition |
+| `file_size_in_bytes` | BIGINT | Total file size in bytes |
+| `file_count` | BIGINT | Number of data files |
+| `last_update_time` | TIMESTAMP | Latest data-file creation time |
+| `created_at` | TIMESTAMP | Partition creation time (only available with 
metastore-tracked catalogs) |
+| `created_by` | STRING | Snapshot id that created the partition 
(catalog-tracked only) |
+| `updated_by` | STRING | Snapshot id that last updated the partition 
(catalog-tracked only) |
+| `options` | STRING | Per-partition options as flat JSON (catalog-tracked 
only) |
+| `total_buckets` | INT | Total bucket count for the partition (0 unless 
catalog-tracked) |
+| `done` | BOOLEAN | Whether the partition is marked done (false unless 
catalog-tracked) |
+
 ### Branch References
 
 System tables support branch syntax:

Reply via email to