This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ab68b26 feat: add referenced_files_size and physical_files_size table
functions (#323)
ab68b26 is described below
commit ab68b26bdd8e01c93e869fd7f560d5561c6dcddf
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 18 23:00:29 2026 +0800
feat: add referenced_files_size and physical_files_size table functions
(#323)
Introduce two DataFusion table-valued functions for storage analysis:
- `referenced_files_size('table')`: Aggregates manifest/data/index file
counts and sizes across all snapshots from main branch, tags, and other
branches. Output rows: total, branch:main, branch:<name>. Uses a shared
manifest cache to avoid redundant reads and processes snapshots
concurrently.
- `physical_files_size('table')`: Scans the table directory recursively and
reports actual physical file sizes categorized by file type (manifest,
data, index). Concurrently lists subdirectories for high throughput on
object stores.
Both functions gracefully handle NotFound errors from concurrently deleted
files during cleanup.
---
.gitignore | 3 +-
crates/integrations/datafusion/src/lib.rs | 4 +
.../datafusion/src/physical_files_size.rs | 175 ++++
.../datafusion/src/referenced_files_size.rs | 205 +++++
crates/paimon/src/io/file_io.rs | 38 +
crates/paimon/src/spec/index_manifest.rs | 12 +
crates/paimon/src/table/mod.rs | 1 +
crates/paimon/src/table/referenced_files.rs | 920 +++++++++++++++++++++
docs/src/sql.md | 99 +++
9 files changed, 1456 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index 6400720..61c0905 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,5 +20,6 @@
.idea
.vscode
**/.DS_Store
-dist/*
+**/dist/
+docs/site/
.qoder
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index 47f1bab..1c83017 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -43,8 +43,10 @@ mod filter_pushdown;
#[cfg(feature = "fulltext")]
mod full_text_search;
mod merge_into;
+mod physical_files_size;
mod physical_plan;
mod procedures;
+mod referenced_files_size;
mod relation_planner;
pub mod runtime;
mod sql_context;
@@ -67,7 +69,9 @@ pub use catalog::{PaimonCatalogProvider,
PaimonSchemaProvider};
pub use error::to_datafusion_error;
#[cfg(feature = "fulltext")]
pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
+pub use physical_files_size::{register_physical_files_size,
PhysicalFilesSizeFunction};
pub use physical_plan::PaimonTableScan;
+pub use referenced_files_size::{register_referenced_files_size,
ReferencedFilesSizeFunction};
pub use relation_planner::PaimonRelationPlanner;
pub use sql_context::SQLContext;
pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/physical_files_size.rs
b/crates/integrations/datafusion/src/physical_files_size.rs
new file mode 100644
index 0000000..14c3af4
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_files_size.rs
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table function that computes the total physical file sizes in the table
directory.
+//!
+//! Usage: `SELECT * FROM physical_files_size('db.table_name')`
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{Int64Array, RecordBatch};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::catalog::Session;
+use datafusion::catalog::TableFunctionImpl;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Catalog;
+use paimon::table::referenced_files::{collect_physical_files_summary,
PhysicalFilesSummary};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+use crate::runtime::{await_with_runtime, block_on_with_runtime};
+use crate::table_function_args::{extract_string_literal,
parse_table_identifier};
+
+const FUNCTION_NAME: &str = "physical_files_size";
+
+pub fn register_physical_files_size(
+ ctx: &SessionContext,
+ catalog: Arc<dyn Catalog>,
+ default_database: &str,
+) {
+ ctx.register_udtf(
+ FUNCTION_NAME,
+ Arc::new(PhysicalFilesSizeFunction::new(catalog, default_database)),
+ );
+}
+
+pub struct PhysicalFilesSizeFunction {
+ catalog: Arc<dyn Catalog>,
+ default_database: String,
+}
+
+impl Debug for PhysicalFilesSizeFunction {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PhysicalFilesSizeFunction")
+ .field("default_database", &self.default_database)
+ .finish()
+ }
+}
+
+impl PhysicalFilesSizeFunction {
+ pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
+ Self {
+ catalog,
+ default_database: default_database.to_string(),
+ }
+ }
+}
+
+impl TableFunctionImpl for PhysicalFilesSizeFunction {
+ fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
+ if args.len() != 1 {
+ return Err(datafusion::error::DataFusionError::Plan(
+ "physical_files_size requires 1 argument:
(table_name)".to_string(),
+ ));
+ }
+
+ let table_name = extract_string_literal(FUNCTION_NAME, &args[0],
"table_name")?;
+ let identifier =
+ parse_table_identifier(FUNCTION_NAME, &table_name,
&self.default_database)?;
+
+ let catalog = Arc::clone(&self.catalog);
+ let table = block_on_with_runtime(
+ async move { catalog.get_table(&identifier).await },
+ "physical_files_size: catalog access thread panicked",
+ )
+ .map_err(to_datafusion_error)?;
+
+ Ok(Arc::new(PhysicalFilesSizeTableProvider { table }))
+ }
+}
+
+fn output_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("manifest_file_count", DataType::Int64, false),
+ Field::new("manifest_file_size", DataType::Int64, false),
+ Field::new("data_file_count", DataType::Int64, false),
+ Field::new("data_file_size", DataType::Int64, false),
+ Field::new("index_file_count", DataType::Int64, false),
+ Field::new("index_file_size", DataType::Int64, false),
+ ]))
+ })
+ .clone()
+}
+
+#[derive(Debug)]
+struct PhysicalFilesSizeTableProvider {
+ table: Table,
+}
+
+#[async_trait]
+impl TableProvider for PhysicalFilesSizeTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ output_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let table = self.table.clone();
+ let summary = await_with_runtime(async move {
+ collect_physical_files_summary(table.file_io(),
table.location()).await
+ })
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let batch = summary_to_record_batch(&summary)?;
+ let schema = output_schema();
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
+
+fn summary_to_record_batch(s: &PhysicalFilesSummary) -> DFResult<RecordBatch> {
+ Ok(RecordBatch::try_new(
+ output_schema(),
+ vec![
+ Arc::new(Int64Array::from(vec![s.manifest_file_count])),
+ Arc::new(Int64Array::from(vec![s.manifest_file_size])),
+ Arc::new(Int64Array::from(vec![s.data_file_count])),
+ Arc::new(Int64Array::from(vec![s.data_file_size])),
+ Arc::new(Int64Array::from(vec![s.index_file_count])),
+ Arc::new(Int64Array::from(vec![s.index_file_size])),
+ ],
+ )?)
+}
diff --git a/crates/integrations/datafusion/src/referenced_files_size.rs
b/crates/integrations/datafusion/src/referenced_files_size.rs
new file mode 100644
index 0000000..387a538
--- /dev/null
+++ b/crates/integrations/datafusion/src/referenced_files_size.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Table function that computes per-snapshot referenced file size summaries.
+//!
+//! Usage: `SELECT * FROM referenced_files_size('db.table_name')`
+
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
+use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::catalog::Session;
+use datafusion::catalog::TableFunctionImpl;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+use paimon::catalog::Catalog;
+use paimon::table::referenced_files::{collect_referenced_files_summary,
ReferencedFilesSummary};
+use paimon::table::Table;
+
+use crate::error::to_datafusion_error;
+use crate::runtime::{await_with_runtime, block_on_with_runtime};
+use crate::table_function_args::{extract_string_literal,
parse_table_identifier};
+
+const FUNCTION_NAME: &str = "referenced_files_size";
+
+pub fn register_referenced_files_size(
+ ctx: &SessionContext,
+ catalog: Arc<dyn Catalog>,
+ default_database: &str,
+) {
+ ctx.register_udtf(
+ FUNCTION_NAME,
+ Arc::new(ReferencedFilesSizeFunction::new(catalog, default_database)),
+ );
+}
+
+pub struct ReferencedFilesSizeFunction {
+ catalog: Arc<dyn Catalog>,
+ default_database: String,
+}
+
+impl Debug for ReferencedFilesSizeFunction {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ReferencedFilesSizeFunction")
+ .field("default_database", &self.default_database)
+ .finish()
+ }
+}
+
+impl ReferencedFilesSizeFunction {
+ pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
+ Self {
+ catalog,
+ default_database: default_database.to_string(),
+ }
+ }
+}
+
+impl TableFunctionImpl for ReferencedFilesSizeFunction {
+ fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
+ if args.len() != 1 {
+ return Err(datafusion::error::DataFusionError::Plan(
+ "referenced_files_size requires 1 argument:
(table_name)".to_string(),
+ ));
+ }
+
+ let table_name = extract_string_literal(FUNCTION_NAME, &args[0],
"table_name")?;
+ let identifier =
+ parse_table_identifier(FUNCTION_NAME, &table_name,
&self.default_database)?;
+
+ let catalog = Arc::clone(&self.catalog);
+ let table = block_on_with_runtime(
+ async move { catalog.get_table(&identifier).await },
+ "referenced_files_size: catalog access thread panicked",
+ )
+ .map_err(to_datafusion_error)?;
+
+ Ok(Arc::new(ReferencedFilesSizeTableProvider { table }))
+ }
+}
+
+fn output_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("source", DataType::Utf8, false),
+ Field::new("manifest_file_count", DataType::Int64, false),
+ Field::new("manifest_file_size", DataType::Int64, false),
+ Field::new("data_file_count", DataType::Int64, false),
+ Field::new("data_file_size", DataType::Int64, false),
+ Field::new("index_file_count", DataType::Int64, false),
+ Field::new("index_file_size", DataType::Int64, false),
+ ]))
+ })
+ .clone()
+}
+
+#[derive(Debug)]
+struct ReferencedFilesSizeTableProvider {
+ table: Table,
+}
+
+#[async_trait]
+impl TableProvider for ReferencedFilesSizeTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ output_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let table = self.table.clone();
+ let summaries = await_with_runtime(async move {
+ let schema = table.schema();
+ let partition_keys = schema.partition_keys();
+ let partition_fields = schema.partition_fields();
+ collect_referenced_files_summary(
+ table.file_io(),
+ table.location(),
+ partition_keys,
+ &partition_fields,
+ )
+ .await
+ })
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let batch = summaries_to_record_batch(&summaries)?;
+ let schema = output_schema();
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
+
+fn summaries_to_record_batch(summaries: &[ReferencedFilesSummary]) ->
DFResult<RecordBatch> {
+ let n = summaries.len();
+ let mut sources = Vec::with_capacity(n);
+ let mut mf_counts = Vec::with_capacity(n);
+ let mut mf_sizes = Vec::with_capacity(n);
+ let mut df_counts = Vec::with_capacity(n);
+ let mut df_sizes = Vec::with_capacity(n);
+ let mut if_counts = Vec::with_capacity(n);
+ let mut if_sizes = Vec::with_capacity(n);
+
+ for s in summaries {
+ sources.push(s.source.as_str());
+ mf_counts.push(s.manifest_file_count);
+ mf_sizes.push(s.manifest_file_size);
+ df_counts.push(s.data_file_count);
+ df_sizes.push(s.data_file_size);
+ if_counts.push(s.index_file_count);
+ if_sizes.push(s.index_file_size);
+ }
+
+ Ok(RecordBatch::try_new(
+ output_schema(),
+ vec![
+ Arc::new(StringArray::from(sources)),
+ Arc::new(Int64Array::from(mf_counts)),
+ Arc::new(Int64Array::from(mf_sizes)),
+ Arc::new(Int64Array::from(df_counts)),
+ Arc::new(Int64Array::from(df_sizes)),
+ Arc::new(Int64Array::from(if_counts)),
+ Arc::new(Int64Array::from(if_sizes)),
+ ],
+ )?)
+}
diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs
index eef5dd6..7e78004 100644
--- a/crates/paimon/src/io/file_io.rs
+++ b/crates/paimon/src/io/file_io.rs
@@ -158,6 +158,44 @@ impl FileIO {
Ok(statuses)
}
+ /// List all files recursively under the given directory path.
+ pub async fn list_status_recursive(&self, path: &str) ->
Result<Vec<FileStatus>> {
+ let (op, relative_path) = self.storage.create(path)?;
+ let base_path = &path[..path.len() - relative_path.len()];
+ let list_path = normalize_root(relative_path);
+
+ let entries =
+ op.list_with(&list_path)
+ .recursive(true)
+ .await
+ .context(IoUnexpectedSnafu {
+ message: format!("Failed to list files recursively in
'{path}'"),
+ })?;
+
+ let mut statuses = Vec::new();
+ let list_path_normalized = list_path.trim_start_matches('/');
+ for entry in entries {
+ let entry_path = entry.path();
+ if entry_path.trim_start_matches('/') == list_path_normalized {
+ continue;
+ }
+ let meta = entry.metadata();
+ if meta.is_dir() {
+ continue;
+ }
+ statuses.push(FileStatus {
+ size: meta.content_length(),
+ is_dir: false,
+ path: format!("{base_path}{entry_path}"),
+ last_modified: meta
+ .last_modified()
+ .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
+ });
+ }
+
+ Ok(statuses)
+ }
+
/// Check if exists.
///
/// References:
<https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
diff --git a/crates/paimon/src/spec/index_manifest.rs
b/crates/paimon/src/spec/index_manifest.rs
index 2599443..cdafc52 100644
--- a/crates/paimon/src/spec/index_manifest.rs
+++ b/crates/paimon/src/spec/index_manifest.rs
@@ -122,6 +122,18 @@ impl IndexManifest {
Self::read_from_bytes(&content)
}
+ /// Read index manifest entries and return their byte size.
+ pub async fn read_with_size(
+ file_io: &FileIO,
+ path: &str,
+ ) -> Result<(Vec<IndexManifestEntry>, i64)> {
+ let input_file = file_io.new_input(path)?;
+ let content = input_file.read().await?;
+ let size = content.len() as i64;
+ let entries = Self::read_from_bytes(&content)?;
+ Ok((entries, size))
+ }
+
/// Read index manifest entries from Avro-encoded bytes.
pub fn read_from_bytes(bytes: &[u8]) -> Result<Vec<IndexManifestEntry>> {
crate::spec::avro::from_avro_bytes_fast(bytes)
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index c84d7c8..c00fa78 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -40,6 +40,7 @@ mod kv_file_writer;
mod partition_filter;
mod postpone_file_writer;
mod read_builder;
+pub mod referenced_files;
pub(crate) mod rest_env;
pub(crate) mod row_id_predicate;
pub(crate) mod schema_manager;
diff --git a/crates/paimon/src/table/referenced_files.rs
b/crates/paimon/src/table/referenced_files.rs
new file mode 100644
index 0000000..2c41c67
--- /dev/null
+++ b/crates/paimon/src/table/referenced_files.rs
@@ -0,0 +1,920 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Collect deduplicated referenced file size summaries for all snapshots of a
table.
+//!
+//! Reference:
[LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java)
+
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use crate::io::FileIO;
+use crate::spec::{
+ bucket_dir_name, BinaryRow, DataField, IndexManifest, Manifest,
ManifestEntry,
+ ManifestFileMeta, PartitionComputer,
+};
+use crate::table::{BranchManager, SnapshotManager, TagManager};
+use futures::future::try_join_all;
+use futures::stream::{self, StreamExt, TryStreamExt};
+
+/// Per-scope aggregated summary of referenced files (deduplicated).
+///
+/// Each row represents the unique referenced files for a scope:
+/// - `"total"`: all snapshots across all branches and tags
+/// - `"branch:main"`: main branch snapshots + tags
+/// - `"branch:<name>"`: a specific branch
+///
+/// Files are deduplicated by file name within each scope, so the sum
+/// represents actual disk usage that is still referenced (protected from
cleanup).
+/// Both ADD and DELETE manifest entries are included since both reference
+/// physical files that cannot be removed until the snapshot expires.
+#[derive(Debug, Clone, Default)]
+pub struct ReferencedFilesSummary {
+ pub source: String,
+ pub manifest_file_count: i64,
+ pub manifest_file_size: i64,
+ pub data_file_count: i64,
+ pub data_file_size: i64,
+ pub index_file_count: i64,
+ pub index_file_size: i64,
+}
+
+/// Deduplicated file set for a scope, keyed by file name.
+#[derive(Default)]
+struct ScopeFileSet {
+ manifest_files: HashMap<String, i64>,
+ data_files: HashMap<String, i64>,
+ index_files: HashMap<String, i64>,
+}
+
+impl ScopeFileSet {
+ fn to_summary(&self, source: &str) -> ReferencedFilesSummary {
+ ReferencedFilesSummary {
+ source: source.to_string(),
+ manifest_file_count: self.manifest_files.len() as i64,
+ manifest_file_size: self.manifest_files.values().sum(),
+ data_file_count: self.data_files.len() as i64,
+ data_file_size: self.data_files.values().sum(),
+ index_file_count: self.index_files.len() as i64,
+ index_file_size: self.index_files.values().sum(),
+ }
+ }
+
+ fn merge(&mut self, other: &ScopeFileSet) {
+ for (k, v) in &other.manifest_files {
+ self.manifest_files.entry(k.clone()).or_insert(*v);
+ }
+ for (k, v) in &other.data_files {
+ self.data_files.entry(k.clone()).or_insert(*v);
+ }
+ for (k, v) in &other.index_files {
+ self.index_files.entry(k.clone()).or_insert(*v);
+ }
+ }
+}
+
+const SNAPSHOT_CONCURRENCY: usize = 32;
+
+/// Cached data file entries (file_name, file_size) per manifest file full
path.
+type ManifestCache = Mutex<HashMap<String, Vec<(String, i64)>>>;
+
+/// Resolves extra file paths for stat-ing their real sizes.
+struct ExtraFileResolver {
+ table_location: String,
+ partition_computer: Option<PartitionComputer>,
+}
+
+impl ExtraFileResolver {
+ fn new(table_location: &str, partition_keys: &[String], schema_fields:
&[DataField]) -> Self {
+ let partition_computer = if partition_keys.is_empty() {
+ None
+ } else {
+ PartitionComputer::new(
+ partition_keys,
+ schema_fields,
+ "__DEFAULT_PARTITION__",
+ false,
+ )
+ .ok()
+ };
+ Self {
+ table_location: table_location.to_string(),
+ partition_computer,
+ }
+ }
+
+ fn resolve_extra_file_path(
+ &self,
+ partition_bytes: &[u8],
+ bucket: i32,
+ extra_file_name: &str,
+ external_path: Option<&str>,
+ ) -> Option<String> {
+ if let Some(ext_path) = external_path {
+ let dir = ext_path.trim_end_matches('/');
+ return Some(format!("{}/{}", dir, extra_file_name));
+ }
+ let partition_path = if let Some(ref computer) =
self.partition_computer {
+ let row = BinaryRow::from_serialized_bytes(partition_bytes).ok()?;
+ computer.generate_partition_path(&row).ok()?
+ } else {
+ String::new()
+ };
+ let bucket_dir = bucket_dir_name(bucket);
+ Some(format!(
+ "{}/{}{}/{}",
+ self.table_location, partition_path, bucket_dir, extra_file_name
+ ))
+ }
+}
+
+/// Collect per-scope deduplicated referenced file size summaries for a table.
+///
+/// Returns rows:
+/// 1. `"total"` — union of all referenced files from main branch, tags, and
branches
+/// 2. `"branch:main"` — main branch snapshots + tag snapshots
+/// 3. `"branch:<name>"` — one row per branch
+///
+/// Snapshots are processed concurrently (up to 32 at a time). Within each
+/// snapshot, manifest list and manifest file reads are also concurrent.
+/// A shared cache avoids re-reading the same manifest file across snapshots.
+///
+/// Files are deduplicated by name within each scope to produce an accurate
+/// count of unique referenced files. Both ADD and DELETE entries are included
+/// since both reference physical files protected from cleanup.
+///
+/// Manifest list files and index manifest files are counted as manifest files,
+/// consistent with `physical_files_size` classification.
+///
+/// Extra files referenced by data file entries are stat-ed to obtain their
+/// real sizes, using partition/bucket info to construct full paths.
+pub async fn collect_referenced_files_summary(
+ file_io: &FileIO,
+ table_location: &str,
+ partition_keys: &[String],
+ schema_fields: &[DataField],
+) -> crate::Result<Vec<ReferencedFilesSummary>> {
+ let manifest_cache: ManifestCache = Mutex::new(HashMap::new());
+ let manifest_cache_ref = &manifest_cache;
+ let extra_resolver = ExtraFileResolver::new(table_location,
partition_keys, schema_fields);
+ let extra_resolver_ref = &extra_resolver;
+
+ let sm = SnapshotManager::new(file_io.clone(), table_location.to_string());
+ let tm = TagManager::new(file_io.clone(), table_location.to_string());
+
+ // 1. Main branch snapshots + tags (concurrently)
+ // For main branch, snapshot reading and manifest resolution both use root
SM.
+ let (main_files, tag_files) = tokio::try_join!(
+ collect_scope_files(file_io, &sm, &sm, manifest_cache_ref,
extra_resolver_ref),
+ collect_tag_files(
+ file_io,
+ &sm,
+ &sm,
+ &tm,
+ manifest_cache_ref,
+ extra_resolver_ref
+ ),
+ )?;
+ let mut main_files = main_files;
+ main_files.merge(&tag_files);
+
+ // 2. Branch file sets (all branches concurrently)
+ let bm = BranchManager::new(file_io.clone(), table_location.to_string());
+ let branch_names = bm.list_all().await?;
+
+ let sm_ref = &sm;
+ let branch_futures: Vec<_> = branch_names
+ .iter()
+ .map(|branch_name| {
+ let branch_sm = sm.with_branch(branch_name);
+ let branch_tm = tm.with_branch(branch_name);
+ async move {
+ // Branch SM reads snapshot/tag files from branch path,
+ // but manifest paths are always resolved from the table root.
+ let (mut branch_files, branch_tag_files) = tokio::try_join!(
+ collect_scope_files(
+ file_io,
+ &branch_sm,
+ sm_ref,
+ manifest_cache_ref,
+ extra_resolver_ref
+ ),
+ collect_tag_files(
+ file_io,
+ &branch_sm,
+ sm_ref,
+ &branch_tm,
+ manifest_cache_ref,
+ extra_resolver_ref
+ ),
+ )?;
+ branch_files.merge(&branch_tag_files);
+ Ok::<_, crate::Error>(branch_files)
+ }
+ })
+ .collect();
+ let branch_results = try_join_all(branch_futures).await?;
+
+ // 3. Assemble output: total, main, branches
+ let mut total_files = ScopeFileSet::default();
+ total_files.merge(&main_files);
+ for bs in &branch_results {
+ total_files.merge(bs);
+ }
+
+ let mut result = vec![
+ total_files.to_summary("total"),
+ main_files.to_summary("branch:main"),
+ ];
+ for (name, files) in branch_names.iter().zip(&branch_results) {
+ result.push(files.to_summary(&format!("branch:{name}")));
+ }
+ Ok(result)
+}
+
+async fn collect_scope_files(
+ file_io: &FileIO,
+ sm: &SnapshotManager,
+ manifest_sm: &SnapshotManager,
+ manifest_cache: &ManifestCache,
+ extra_resolver: &ExtraFileResolver,
+) -> crate::Result<ScopeFileSet> {
+ let snapshot_ids = sm.list_all_ids().await?;
+
+ let per_snapshot: Vec<Option<ScopeFileSet>> = stream::iter(snapshot_ids)
+ .map(|snapshot_id| {
+ let sm = sm.clone();
+ async move {
+ collect_single_snapshot_files(
+ file_io,
+ &sm,
+ manifest_sm,
+ snapshot_id,
+ manifest_cache,
+ extra_resolver,
+ )
+ .await
+ }
+ })
+ .buffer_unordered(SNAPSHOT_CONCURRENCY)
+ .try_collect()
+ .await?;
+
+ let mut merged = ScopeFileSet::default();
+ for fs in per_snapshot.into_iter().flatten() {
+ merged.merge(&fs);
+ }
+ Ok(merged)
+}
+
+async fn collect_tag_files(
+ file_io: &FileIO,
+ _sm: &SnapshotManager,
+ manifest_sm: &SnapshotManager,
+ tm: &TagManager,
+ manifest_cache: &ManifestCache,
+ extra_resolver: &ExtraFileResolver,
+) -> crate::Result<ScopeFileSet> {
+ let tag_names = tm.list_all_names().await?;
+
+ let tag_futures: Vec<_> = tag_names
+ .iter()
+ .map(|tag_name| async move {
+ let snapshot = match tm.get(tag_name).await? {
+ Some(s) => s,
+ None => return Ok(None),
+ };
+ collect_snapshot_files(
+ file_io,
+ manifest_sm,
+ &snapshot,
+ manifest_cache,
+ extra_resolver,
+ )
+ .await
+ })
+ .collect();
+ let tag_results = try_join_all(tag_futures).await?;
+
+ let mut merged = ScopeFileSet::default();
+ for fs in tag_results.into_iter().flatten() {
+ merged.merge(&fs);
+ }
+ Ok(merged)
+}
+
+async fn collect_single_snapshot_files(
+ file_io: &FileIO,
+ sm: &SnapshotManager,
+ manifest_sm: &SnapshotManager,
+ snapshot_id: i64,
+ manifest_cache: &ManifestCache,
+ extra_resolver: &ExtraFileResolver,
+) -> crate::Result<Option<ScopeFileSet>> {
+ let snapshot = match try_get_snapshot(sm, snapshot_id).await? {
+ Some(s) => s,
+ None => return Ok(None),
+ };
+
+ collect_snapshot_files(
+ file_io,
+ manifest_sm,
+ &snapshot,
+ manifest_cache,
+ extra_resolver,
+ )
+ .await
+}
+
+async fn collect_snapshot_files(
+ file_io: &FileIO,
+ manifest_sm: &SnapshotManager,
+ snapshot: &crate::spec::Snapshot,
+ manifest_cache: &ManifestCache,
+ extra_resolver: &ExtraFileResolver,
+) -> crate::Result<Option<ScopeFileSet>> {
+ let mut file_set = ScopeFileSet::default();
+
+ // Collect manifest list file names (these are manifest-type files
themselves)
+ let mut manifest_list_names = vec![
+ snapshot.base_manifest_list().to_string(),
+ snapshot.delta_manifest_list().to_string(),
+ ];
+ if let Some(cl) = snapshot.changelog_manifest_list() {
+ manifest_list_names.push(cl.to_string());
+ }
+
+ // Pre-compute paths (always resolved from table root)
+ let manifest_list_paths: Vec<String> = manifest_list_names
+ .iter()
+ .map(|name| manifest_sm.manifest_path(name))
+ .collect();
+
+ // Read all manifest lists concurrently and record their sizes
+ let manifest_list_futures: Vec<_> = manifest_list_paths
+ .iter()
+ .map(|path| try_read_manifest_list_with_size(file_io, path))
+ .collect();
+ let manifest_list_results = try_join_all(manifest_list_futures).await?;
+
+ // Register manifest list files themselves as manifest files
+ for (name, (_, size)) in
manifest_list_names.iter().zip(&manifest_list_results) {
+ if *size > 0 {
+ file_set.manifest_files.entry(name.clone()).or_insert(*size);
+ }
+ }
+
+ // Flatten all manifest file metas from all manifest lists
+ let all_manifest_metas: Vec<&ManifestFileMeta> = manifest_list_results
+ .iter()
+ .flat_map(|(metas, _)| metas.iter())
+ .collect();
+
+ // Register manifest files
+ for meta in &all_manifest_metas {
+ file_set
+ .manifest_files
+ .entry(meta.file_name().to_string())
+ .or_insert(meta.file_size());
+ }
+
+ // Read manifest files to get data file entries, using cache by full path
+ let manifest_paths: Vec<String> = all_manifest_metas
+ .iter()
+ .map(|meta| manifest_sm.manifest_path(meta.file_name()))
+ .collect();
+
+ let uncached_indices: Vec<usize> = manifest_paths
+ .iter()
+ .enumerate()
+ .filter(|(_, path)| {
+ let cache = manifest_cache.lock().unwrap();
+ !cache.contains_key(path.as_str())
+ })
+ .map(|(i, _)| i)
+ .collect();
+
+ if !uncached_indices.is_empty() {
+ let uncached_paths: Vec<&str> = uncached_indices
+ .iter()
+ .map(|&i| manifest_paths[i].as_str())
+ .collect();
+
+ let manifest_futures: Vec<_> = uncached_paths
+ .iter()
+ .map(|path| try_read_manifest(file_io, path))
+ .collect();
+ let results = try_join_all(manifest_futures).await?;
+
+ // Collect extra files that need stat-ing
+ let mut extra_file_stat_tasks: Vec<(usize, usize, String)> =
Vec::new();
+ let mut all_file_entries: Vec<Vec<(String, i64)>> =
Vec::with_capacity(results.len());
+
+ for (manifest_idx, entries) in results.iter().enumerate() {
+ let mut file_entries: Vec<(String, i64)> = Vec::new();
+ for e in entries {
+ file_entries.push((e.file().file_name.clone(),
e.file().file_size));
+ for extra in &e.file().extra_files {
+ let entry_idx = file_entries.len();
+ let full_path = extra_resolver.resolve_extra_file_path(
+ e.partition(),
+ e.bucket(),
+ extra,
+ e.file().external_path.as_deref(),
+ );
+ if let Some(path) = full_path {
+ extra_file_stat_tasks.push((manifest_idx, entry_idx,
path));
+ }
+ file_entries.push((extra.clone(), 0));
+ }
+ }
+ all_file_entries.push(file_entries);
+ }
+
+ // Batch stat extra files concurrently
+ if !extra_file_stat_tasks.is_empty() {
+ let stat_futures: Vec<_> = extra_file_stat_tasks
+ .iter()
+ .map(|(_, _, path)| try_stat_file_size(file_io, path))
+ .collect();
+ let stat_results = try_join_all(stat_futures).await?;
+
+ for ((manifest_idx, entry_idx, _), size) in
+ extra_file_stat_tasks.iter().zip(stat_results)
+ {
+ if size > 0 {
+ all_file_entries[*manifest_idx][*entry_idx].1 = size;
+ }
+ }
+ }
+
+ let mut cache = manifest_cache.lock().unwrap();
+ for (path, file_entries) in
uncached_paths.into_iter().zip(all_file_entries) {
+ cache.insert(path.to_string(), file_entries);
+ }
+ }
+
+ // Collect data files from cache (deduplicated by HashMap key)
+ {
+ let cache = manifest_cache.lock().unwrap();
+ for path in &manifest_paths {
+ if let Some(entries) = cache.get(path.as_str()) {
+ for (name, size) in entries {
+ file_set.data_files.entry(name.clone()).or_insert(*size);
+ }
+ }
+ }
+ }
+
+ // Read index manifest if present
+ if let Some(index_manifest_name) = snapshot.index_manifest() {
+ // The index manifest file itself is a manifest-type file
+ let index_manifest_path =
manifest_sm.manifest_path(index_manifest_name);
+ let index_entries =
+ try_read_index_manifest_with_size(file_io,
&index_manifest_path).await?;
+
+ if index_entries.1 > 0 {
+ file_set
+ .manifest_files
+ .entry(index_manifest_name.to_string())
+ .or_insert(index_entries.1);
+ }
+
+ for entry in &index_entries.0 {
+ file_set
+ .index_files
+ .entry(entry.index_file.file_name.clone())
+ .or_insert(entry.index_file.file_size as i64);
+ }
+ }
+
+ // Collect statistics file if present
+ if let Some(statistics_name) = snapshot.statistics() {
+ let statistics_path = format!(
+ "{}/statistics/{}",
+ extra_resolver.table_location, statistics_name
+ );
+ let size = try_stat_file_size(file_io, &statistics_path).await?;
+ if size > 0 {
+ file_set
+ .manifest_files
+ .entry(statistics_name.to_string())
+ .or_insert(size);
+ }
+ }
+
+ Ok(Some(file_set))
+}
+
+async fn try_get_snapshot(
+ sm: &SnapshotManager,
+ snapshot_id: i64,
+) -> crate::Result<Option<crate::spec::Snapshot>> {
+ match sm.get_snapshot(snapshot_id).await {
+ Ok(s) => Ok(Some(s)),
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ Ok(None)
+ }
+ Err(crate::Error::DataInvalid { ref message, .. })
+ if message.contains("does not exist") =>
+ {
+ Ok(None)
+ }
+ Err(e) => Err(e),
+ }
+}
+
+/// Read a manifest list file. Returns (entries, file_size_in_bytes).
+async fn try_read_manifest_list_with_size(
+ file_io: &FileIO,
+ path: &str,
+) -> crate::Result<(Vec<ManifestFileMeta>, i64)> {
+ let input = file_io.new_input(path)?;
+ match input.read().await {
+ Ok(bytes) => {
+ let size = bytes.len() as i64;
+ let metas = crate::spec::avro::from_avro_bytes_fast(&bytes)?;
+ Ok((metas, size))
+ }
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ Ok((Vec::new(), 0))
+ }
+ Err(e) => Err(e),
+ }
+}
+
+async fn try_read_manifest(file_io: &FileIO, path: &str) ->
crate::Result<Vec<ManifestEntry>> {
+ match Manifest::read(file_io, path).await {
+ Ok(entries) => Ok(entries),
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ Ok(Vec::new())
+ }
+ Err(e) => Err(e),
+ }
+}
+
+/// Stat a file to get its size. Returns 0 if the file is not found.
+async fn try_stat_file_size(file_io: &FileIO, path: &str) ->
crate::Result<i64> {
+ let input = file_io.new_input(path)?;
+ match input.metadata().await {
+ Ok(status) => Ok(status.size as i64),
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ Ok(0)
+ }
+ Err(e) => Err(e),
+ }
+}
+
+/// Read an index manifest file. Returns (entries, file_size_in_bytes).
+async fn try_read_index_manifest_with_size(
+ file_io: &FileIO,
+ path: &str,
+) -> crate::Result<(Vec<crate::spec::IndexManifestEntry>, i64)> {
+ match IndexManifest::read_with_size(file_io, path).await {
+ Ok(result) => Ok(result),
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ Ok((Vec::new(), 0))
+ }
+ Err(e) => Err(e),
+ }
+}
+
+/// Summary of all physical files in the table directory, categorized by file
type.
+#[derive(Debug, Clone, Default)]
+pub struct PhysicalFilesSummary {
+ pub manifest_file_count: i64,
+ pub manifest_file_size: i64,
+ pub data_file_count: i64,
+ pub data_file_size: i64,
+ pub index_file_count: i64,
+ pub index_file_size: i64,
+}
+
+/// Categorize a file name into a file type.
+/// Everything that is not a manifest/statistics or index file is classified
as data.
+fn classify_file_name(file_name: &str) -> FileType {
+ if file_name.starts_with("manifest-")
+ || file_name.starts_with("index-manifest-")
+ || file_name.starts_with("statistics-")
+ {
+ FileType::Manifest
+ } else if file_name.starts_with("index-") {
+ FileType::Index
+ } else {
+ FileType::Data
+ }
+}
+
+enum FileType {
+ Manifest,
+ Data,
+ Index,
+}
+
+const DIR_LIST_CONCURRENCY: usize = 32;
+
+/// Scan the table directory and compute total file sizes grouped by type.
+///
+/// First lists top-level subdirectories, then concurrently lists each
+/// subdirectory recursively (up to 32 in parallel) to maximize throughput
+/// on object stores with many partition directories.
+///
+/// Files are classified by their file name prefix:
+/// - `manifest-*` / `index-manifest-*` → manifest
+/// - `index-*` (excluding `index-manifest-*`) → index
+/// - Everything else → data
+pub async fn collect_physical_files_summary(
+ file_io: &FileIO,
+ table_location: &str,
+) -> crate::Result<PhysicalFilesSummary> {
+ // List top-level entries to discover subdirectories and top-level files
+ let top_entries = match file_io.list_status(table_location).await {
+ Ok(s) => s,
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ return Ok(PhysicalFilesSummary::default());
+ }
+ Err(e) => return Err(e),
+ };
+
+ let mut summary = PhysicalFilesSummary::default();
+
+ // Classify top-level files directly
+ let mut sub_dirs = Vec::new();
+ for entry in &top_entries {
+ if entry.is_dir {
+ sub_dirs.push(entry.path.clone());
+ } else {
+ let file_name =
entry.path.rsplit('/').next().unwrap_or(&entry.path);
+ accumulate_file(&mut summary, file_name, entry.size);
+ }
+ }
+
+ // Concurrently list each subdirectory recursively
+ let dir_results: Vec<crate::Result<Vec<crate::io::FileStatus>>> =
stream::iter(sub_dirs)
+ .map(|dir_path| async move {
+ match file_io.list_status_recursive(&dir_path).await {
+ Ok(s) => Ok(s),
+ Err(crate::Error::IoUnexpected { ref source, .. })
+ if source.kind() == opendal::ErrorKind::NotFound =>
+ {
+ Ok(Vec::new())
+ }
+ Err(e) => Err(e),
+ }
+ })
+ .buffer_unordered(DIR_LIST_CONCURRENCY)
+ .collect()
+ .await;
+
+ for result in dir_results {
+ let statuses = result?;
+ for status in &statuses {
+ let file_name =
status.path.rsplit('/').next().unwrap_or(&status.path);
+ accumulate_file(&mut summary, file_name, status.size);
+ }
+ }
+
+ Ok(summary)
+}
+
+fn accumulate_file(summary: &mut PhysicalFilesSummary, file_name: &str, size:
u64) {
+ match classify_file_name(file_name) {
+ FileType::Manifest => {
+ summary.manifest_file_count += 1;
+ summary.manifest_file_size += size as i64;
+ }
+ FileType::Data => {
+ summary.data_file_count += 1;
+ summary.data_file_size += size as i64;
+ }
+ FileType::Index => {
+ summary.index_file_count += 1;
+ summary.index_file_size += size as i64;
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::spec::{CommitKind, Snapshot};
+ use crate::table::{BranchManager, SnapshotManager, TagManager};
+
+ fn test_file_io() -> FileIO {
+ FileIOBuilder::new("memory").build().unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_collect_empty_table() {
+ let file_io = test_file_io();
+ let result =
+ collect_referenced_files_summary(&file_io,
"memory:/test_empty_table", &[], &[])
+ .await
+ .unwrap();
+ // total + branch:main
+ assert_eq!(result.len(), 2);
+ assert_eq!(result[0].source, "total");
+ assert_eq!(result[0].data_file_count, 0);
+ assert_eq!(result[1].source, "branch:main");
+ assert_eq!(result[1].data_file_count, 0);
+ }
+
+ #[tokio::test]
+ async fn test_collect_with_missing_manifest() {
+ let table_path = "memory:/test_missing_manifest";
+ let file_io = test_file_io();
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+
+ let sm = SnapshotManager::new(file_io.clone(), table_path.to_string());
+
+ // Create a snapshot that references non-existent manifest lists
+ let snapshot = Snapshot::builder()
+ .version(3)
+ .id(1)
+ .schema_id(0)
+ .base_manifest_list("non-existent-base".to_string())
+ .delta_manifest_list("non-existent-delta".to_string())
+ .commit_user("test".to_string())
+ .commit_identifier(0)
+ .commit_kind(CommitKind::APPEND)
+ .time_millis(1000)
+ .build();
+ sm.commit_snapshot(&snapshot).await.unwrap();
+
+ let result = collect_referenced_files_summary(&file_io, table_path,
&[], &[])
+ .await
+ .unwrap();
+ // total + branch:main
+ assert_eq!(result.len(), 2);
+ assert_eq!(result[0].source, "total");
+ assert_eq!(result[0].manifest_file_count, 0);
+ assert_eq!(result[0].data_file_count, 0);
+ assert_eq!(result[1].source, "branch:main");
+ assert_eq!(result[1].manifest_file_count, 0);
+ assert_eq!(result[1].data_file_count, 0);
+ }
+
+ #[tokio::test]
+ async fn test_branch_tag_referenced_files() {
+ use crate::spec::stats::BinaryTableStats;
+ use crate::spec::{DataFileMeta, FileKind, Manifest, ManifestFileMeta,
ManifestList};
+
+ let table_path = "memory:/test_branch_tag";
+ let file_io = test_file_io();
+
+ file_io
+ .mkdirs(&format!("{table_path}/snapshot/"))
+ .await
+ .unwrap();
+ file_io
+ .mkdirs(&format!("{table_path}/manifest/"))
+ .await
+ .unwrap();
+
+ let sm = SnapshotManager::new(file_io.clone(), table_path.to_string());
+ let empty_stats = BinaryTableStats::new(vec![0u8; 8], vec![0u8; 8],
vec![Some(0)]);
+
+ // Write a manifest file (referenced by branch tag only) at the TABLE
ROOT
+ let manifest_name = "manifest-branch-only-1";
+ let manifest_path = format!("{table_path}/manifest/{manifest_name}");
+ let data_file = DataFileMeta {
+ file_name: "data-branch-tag-file-1.parquet".to_string(),
+ file_size: 4096,
+ row_count: 100,
+ min_key: vec![],
+ max_key: vec![],
+ key_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ value_stats: BinaryTableStats::new(vec![], vec![], vec![]),
+ min_sequence_number: 0,
+ max_sequence_number: 0,
+ schema_id: 0,
+ level: 0,
+ extra_files: vec![],
+ creation_time: None,
+ delete_row_count: Some(0),
+ embedded_index: None,
+ file_source: None,
+ value_stats_cols: None,
+ external_path: None,
+ first_row_id: None,
+ write_cols: None,
+ };
+ let entry = ManifestEntry::new(FileKind::Add, vec![0u8; 12], 0, 1,
data_file, 2);
+ Manifest::write(&file_io, &manifest_path, &[entry])
+ .await
+ .unwrap();
+
+ // Write a manifest list that references the above manifest (at the
table root)
+ let manifest_list_name = "manifest-list-branch-tag-base";
+ let manifest_list_path =
format!("{table_path}/manifest/{manifest_list_name}");
+ let manifest_meta =
+ ManifestFileMeta::new(manifest_name.to_string(), 512, 1, 0,
empty_stats.clone(), 0);
+ ManifestList::write(&file_io, &manifest_list_path, &[manifest_meta])
+ .await
+ .unwrap();
+
+ // Write an empty delta manifest list at the table root
+ let delta_list_name = "manifest-list-branch-tag-delta";
+ let delta_list_path =
format!("{table_path}/manifest/{delta_list_name}");
+ ManifestList::write(&file_io, &delta_list_path, &[])
+ .await
+ .unwrap();
+
+ // Create a main branch snapshot (with non-existent manifest lists)
+ let main_snapshot = Snapshot::builder()
+ .version(3)
+ .id(1)
+ .schema_id(0)
+ .base_manifest_list("manifest-list-main-base".to_string())
+ .delta_manifest_list("manifest-list-main-delta".to_string())
+ .commit_user("test".to_string())
+ .commit_identifier(0)
+ .commit_kind(CommitKind::APPEND)
+ .time_millis(1000)
+ .build();
+ sm.commit_snapshot(&main_snapshot).await.unwrap();
+
+ // Create branch b1 with NO snapshots
+ let bm = BranchManager::new(file_io.clone(), table_path.to_string());
+ bm.create_branch("b1").await.unwrap();
+
+ // Create a tag under branch b1 that references the readable manifest
lists
+ let branch_tm = TagManager::new(file_io.clone(),
table_path.to_string()).with_branch("b1");
+ let branch_tag_snapshot = Snapshot::builder()
+ .version(3)
+ .id(100)
+ .schema_id(0)
+ .base_manifest_list(manifest_list_name.to_string())
+ .delta_manifest_list(delta_list_name.to_string())
+ .commit_user("test".to_string())
+ .commit_identifier(0)
+ .commit_kind(CommitKind::APPEND)
+ .time_millis(2000)
+ .build();
+ branch_tm.create("v1", &branch_tag_snapshot).await.unwrap();
+
+ let result = collect_referenced_files_summary(&file_io, table_path,
&[], &[])
+ .await
+ .unwrap();
+
+ // Should have: total, branch:main, branch:b1
+ assert_eq!(result.len(), 3);
+ assert_eq!(result[0].source, "total");
+ assert_eq!(result[1].source, "branch:main");
+ assert_eq!(result[2].source, "branch:b1");
+
+ // branch:b1 must have non-zero counts from the branch tag's readable
manifests.
+ // The manifest list + manifest file + delta manifest list = 3
manifest files.
+ assert!(
+ result[2].manifest_file_count > 0,
+ "branch:b1 must have manifest files from branch tag, got {}",
+ result[2].manifest_file_count
+ );
+ assert!(
+ result[2].manifest_file_size > 0,
+ "branch:b1 must have non-zero manifest file size, got {}",
+ result[2].manifest_file_size
+ );
+ // The manifest references one data file
+ assert_eq!(result[2].data_file_count, 1);
+ assert_eq!(result[2].data_file_size, 4096);
+
+ // total should include branch:b1's files
+ assert!(result[0].data_file_count >= 1);
+ assert!(result[0].data_file_size >= 4096);
+ }
+}
diff --git a/docs/src/sql.md b/docs/src/sql.md
index b075988..bdad090 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -537,6 +537,105 @@ SELECT * FROM full_text_search('paimon.my_db.docs',
'content', 'paimon search',
The function searches across all Tantivy full-text index files for the target
column, merges results by relevance score, and returns the top-k matching rows.
If no matching index is found, an empty result is returned.
+## Referenced Files Size
+
+The `referenced_files_size` table-valued function computes aggregated
manifest/data/index file size summaries for all snapshots referenced by a
table, including snapshots from the main branch, tags, and other branches. This
is useful for understanding storage usage and for orphan file analysis.
+
+Historical snapshots may be in the process of being cleaned up — if a manifest
file has already been deleted, it is gracefully skipped (counted as 0
files/bytes).
+
+### Registration
+
+```rust
+use paimon_datafusion::register_referenced_files_size;
+
+register_referenced_files_size(&ctx, catalog.clone(), "default");
+```
+
+### Usage
+
+```sql
+SELECT * FROM referenced_files_size('table_name')
+```
+
+| Argument | Type | Description |
+|---|---|---|
+| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or
short form |
+
+Example:
+
+```sql
+SELECT * FROM referenced_files_size('paimon.my_db.orders');
+```
+
+### Output Schema
+
+| Column | Type | Description |
+|---|---|---|
+| `source` | STRING | Scope: `total` or `branch:<name>` |
+| `manifest_file_count` | BIGINT | Number of manifest files |
+| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
+| `data_file_count` | BIGINT | Number of data files |
+| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `index_file_count` | BIGINT | Number of index files |
+| `index_file_size` | BIGINT | Total size of index files (bytes) |
+
+The output contains one row per scope:
+- `total` — sum across all branches and tags
+- `branch:main` — main branch snapshots + tag snapshots
+- `branch:<name>` — one row per other branch
+
+To get the total referenced size:
+
+```sql
+SELECT manifest_file_size + data_file_size + index_file_size AS total_size
+FROM referenced_files_size('paimon.my_db.orders')
+WHERE source = 'total';
+```
+
+## Physical Files Size
+
+The `physical_files_size` table-valued function scans the table directory
recursively and computes the total size of all physical files on disk,
categorized by file type. By comparing with `referenced_files_size`, you can
identify orphan files that are no longer referenced by any snapshot.
+
+Files are classified by their file name prefix:
+- `manifest-*` / `index-manifest-*` → manifest
+- `index-*` (excluding `index-manifest-*`) → index
+- Everything else → data
+
+### Registration
+
+```rust
+use paimon_datafusion::register_physical_files_size;
+
+register_physical_files_size(&ctx, catalog.clone(), "default");
+```
+
+### Usage
+
+```sql
+SELECT * FROM physical_files_size('table_name')
+```
+
+| Argument | Type | Description |
+|---|---|---|
+| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or
short form |
+
+Example:
+
+```sql
+SELECT * FROM physical_files_size('paimon.my_db.orders');
+```
+
+### Output Schema
+
+| Column | Type | Description |
+|---|---|---|
+| `manifest_file_count` | BIGINT | Number of manifest files on disk |
+| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
+| `data_file_count` | BIGINT | Number of data files on disk |
+| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `index_file_count` | BIGINT | Number of index files on disk |
+| `index_file_size` | BIGINT | Total size of index files (bytes) |
+
## Time Travel
Paimon supports time travel queries to read historical data.