QuakeWang commented on code in PR #323: URL: https://github.com/apache/paimon-rust/pull/323#discussion_r3258920509
########## crates/paimon/src/table/referenced_files.rs: ########## @@ -0,0 +1,897 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect deduplicated referenced file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{ + bucket_dir_name, BinaryRow, DataField, IndexManifest, Manifest, ManifestEntry, + ManifestFileMeta, PartitionComputer, +}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files (deduplicated). +/// +/// Each row represents the unique referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"branch:main"`: main branch snapshots + tags +/// - `"branch:<name>"`: a specific branch +/// +/// Files are deduplicated by file name within each scope, so the sum +/// represents actual disk usage that is still referenced (protected from cleanup). +/// Both ADD and DELETE manifest entries are included since both reference +/// physical files that cannot be removed until the snapshot expires. +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +/// Deduplicated file set for a scope, keyed by file name. +#[derive(Default)] +struct ScopeFileSet { + manifest_files: HashMap<String, i64>, + data_files: HashMap<String, i64>, + index_files: HashMap<String, i64>, +} + +impl ScopeFileSet { + fn to_summary(&self, source: &str) -> ReferencedFilesSummary { + ReferencedFilesSummary { + source: source.to_string(), + manifest_file_count: self.manifest_files.len() as i64, + manifest_file_size: self.manifest_files.values().sum(), + data_file_count: self.data_files.len() as i64, + data_file_size: self.data_files.values().sum(), + index_file_count: self.index_files.len() as i64, + index_file_size: self.index_files.values().sum(), + } + } + + fn merge(&mut self, other: &ScopeFileSet) { + for (k, v) in &other.manifest_files { + self.manifest_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.data_files { + self.data_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.index_files { + self.index_files.entry(k.clone()).or_insert(*v); + } + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached data file entries (file_name, file_size) per manifest file full path. +type ManifestCache = Mutex<HashMap<String, Vec<(String, i64)>>>; + +/// Resolves extra file paths for stat-ing their real sizes. +struct ExtraFileResolver { + table_location: String, + partition_computer: Option<PartitionComputer>, +} + +impl ExtraFileResolver { + fn new(table_location: &str, partition_keys: &[String], schema_fields: &[DataField]) -> Self { + let partition_computer = if partition_keys.is_empty() { + None + } else { + PartitionComputer::new( + partition_keys, + schema_fields, + "__DEFAULT_PARTITION__", + false, + ) + .ok() + }; + Self { + table_location: table_location.to_string(), + partition_computer, + } + } + + fn resolve_extra_file_path( + &self, + partition_bytes: &[u8], + bucket: i32, + extra_file_name: &str, + ) -> Option<String> { + let partition_path = if let Some(ref computer) = self.partition_computer { + let row = BinaryRow::from_serialized_bytes(partition_bytes).ok()?; + computer.generate_partition_path(&row).ok()? + } else { + String::new() + }; + let bucket_dir = bucket_dir_name(bucket); + Some(format!( + "{}/{}{}/{}", + self.table_location, partition_path, bucket_dir, extra_file_name + )) + } +} + +/// Collect per-scope deduplicated referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all referenced files from main branch, tags, and branches +/// 2. `"branch:main"` — main branch snapshots + tag snapshots +/// 3. `"branch:<name>"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Files are deduplicated by name within each scope to produce an accurate +/// count of unique referenced files. Both ADD and DELETE entries are included +/// since both reference physical files protected from cleanup. +/// +/// Manifest list files and index manifest files are counted as manifest files, +/// consistent with `physical_files_size` classification. +/// +/// Extra files referenced by data file entries are stat-ed to obtain their +/// real sizes, using partition/bucket info to construct full paths. +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, + partition_keys: &[String], + schema_fields: &[DataField], +) -> crate::Result<Vec<ReferencedFilesSummary>> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + let extra_resolver = ExtraFileResolver::new(table_location, partition_keys, schema_fields); + let extra_resolver_ref = &extra_resolver; + + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + + // 1. Main branch snapshots + tags (concurrently) + // For main branch, snapshot reading and manifest resolution both use root SM. + let (main_files, tag_files) = tokio::try_join!( + collect_scope_files(file_io, &sm, &sm, manifest_cache_ref, extra_resolver_ref), + collect_tag_files( + file_io, + &sm, + &sm, + &tm, + manifest_cache_ref, + extra_resolver_ref + ), + )?; + let mut main_files = main_files; + main_files.merge(&tag_files); + + // 2. Branch file sets (all branches concurrently) + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + + let sm_ref = &sm; + let branch_futures: Vec<_> = branch_names + .iter() + .map(|branch_name| { + let branch_sm = sm.with_branch(branch_name); + let branch_tm = tm.with_branch(branch_name); + async move { + // Branch SM reads snapshot/tag files from branch path, + // but manifest paths are always resolved from the table root. + let (mut branch_files, branch_tag_files) = tokio::try_join!( + collect_scope_files( + file_io, + &branch_sm, + sm_ref, + manifest_cache_ref, + extra_resolver_ref + ), + collect_tag_files( + file_io, + &branch_sm, + sm_ref, + &branch_tm, + manifest_cache_ref, + extra_resolver_ref + ), + )?; + branch_files.merge(&branch_tag_files); + Ok::<_, crate::Error>(branch_files) + } + }) + .collect(); + let branch_results = try_join_all(branch_futures).await?; + + // 3. Assemble output: total, main, branches + let mut total_files = ScopeFileSet::default(); + total_files.merge(&main_files); + for bs in &branch_results { + total_files.merge(bs); + } + + let mut result = vec![ + total_files.to_summary("total"), + main_files.to_summary("branch:main"), + ]; + for (name, files) in branch_names.iter().zip(&branch_results) { + result.push(files.to_summary(&format!("branch:{name}"))); + } + Ok(result) +} + +async fn collect_scope_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<ScopeFileSet> { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec<Option<ScopeFileSet>> = stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_files( + file_io, + &sm, + manifest_sm, + snapshot_id, + manifest_cache, + extra_resolver, + ) + .await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut merged = ScopeFileSet::default(); + for fs in per_snapshot.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_tag_files( + file_io: &FileIO, + _sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + tm: &TagManager, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<ScopeFileSet> { + let tag_names = tm.list_all_names().await?; + + let tag_futures: Vec<_> = tag_names + .iter() + .map(|tag_name| async move { + let snapshot = match tm.get(tag_name).await? { + Some(s) => s, + None => return Ok(None), + }; + collect_snapshot_files( + file_io, + manifest_sm, + &snapshot, + manifest_cache, + extra_resolver, + ) + .await + }) + .collect(); + let tag_results = try_join_all(tag_futures).await?; + + let mut merged = ScopeFileSet::default(); + for fs in tag_results.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_single_snapshot_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + snapshot_id: i64, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<Option<ScopeFileSet>> { + let snapshot = match try_get_snapshot(sm, snapshot_id).await? { + Some(s) => s, + None => return Ok(None), + }; + + collect_snapshot_files( + file_io, + manifest_sm, + &snapshot, + manifest_cache, + extra_resolver, + ) + .await +} + +async fn collect_snapshot_files( + file_io: &FileIO, + manifest_sm: &SnapshotManager, + snapshot: &crate::spec::Snapshot, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<Option<ScopeFileSet>> { + let mut file_set = ScopeFileSet::default(); + + // Collect manifest list file names (these are manifest-type files themselves) + let mut manifest_list_names = vec![ + snapshot.base_manifest_list().to_string(), + snapshot.delta_manifest_list().to_string(), + ]; + if let Some(cl) = snapshot.changelog_manifest_list() { + manifest_list_names.push(cl.to_string()); + } + + // Pre-compute paths (always resolved from table root) + let manifest_list_paths: Vec<String> = manifest_list_names + .iter() + .map(|name| manifest_sm.manifest_path(name)) + .collect(); + + // Read all manifest lists concurrently and record their sizes + let manifest_list_futures: Vec<_> = manifest_list_paths + .iter() + .map(|path| try_read_manifest_list_with_size(file_io, path)) + .collect(); + let manifest_list_results = try_join_all(manifest_list_futures).await?; + + // Register manifest list files themselves as manifest files + for (name, (_, size)) in manifest_list_names.iter().zip(&manifest_list_results) { + if *size > 0 { + file_set.manifest_files.entry(name.clone()).or_insert(*size); + } + } + + // Flatten all manifest file metas from all manifest lists + let all_manifest_metas: Vec<&ManifestFileMeta> = manifest_list_results + .iter() + .flat_map(|(metas, _)| metas.iter()) + .collect(); + + // Register manifest files + for meta in &all_manifest_metas { + file_set + .manifest_files + .entry(meta.file_name().to_string()) + .or_insert(meta.file_size()); + } + + // Read manifest files to get data file entries, using cache by full path + let manifest_paths: Vec<String> = all_manifest_metas + .iter() + .map(|meta| manifest_sm.manifest_path(meta.file_name())) + .collect(); + + let uncached_indices: Vec<usize> = manifest_paths + .iter() + .enumerate() + .filter(|(_, path)| { + let cache = manifest_cache.lock().unwrap(); + !cache.contains_key(path.as_str()) + }) + .map(|(i, _)| i) + .collect(); + + if !uncached_indices.is_empty() { + let uncached_paths: Vec<&str> = uncached_indices + .iter() + .map(|&i| manifest_paths[i].as_str()) + .collect(); + + let manifest_futures: Vec<_> = uncached_paths + .iter() + .map(|path| try_read_manifest(file_io, path)) + .collect(); + let results = try_join_all(manifest_futures).await?; + + // Collect extra files that need stat-ing + let mut extra_file_stat_tasks: Vec<(usize, usize, String)> = Vec::new(); + let mut all_file_entries: Vec<Vec<(String, i64)>> = Vec::with_capacity(results.len()); + + for (manifest_idx, entries) in results.iter().enumerate() { + let mut file_entries: Vec<(String, i64)> = Vec::new(); + for e in entries { + file_entries.push((e.file().file_name.clone(), e.file().file_size)); + for extra in &e.file().extra_files { + let entry_idx = file_entries.len(); + let full_path = + extra_resolver.resolve_extra_file_path(e.partition(), e.bucket(), extra); Review Comment: `extra_files` are stat-ed now, but this path still only comes from the table partition/bucket directory. For data files with `external_path`, Java aligns extra files with the main data file's actual directory (`DataFilePathFactory.toAlignedPath` uses `externalPathDir()` when present). With the current resolver, those extra files are looked up from the table root path instead, and missing files are silently counted as `0`. This should resolve extra files from the main data file's directory. ########## crates/paimon/src/table/referenced_files.rs: ########## @@ -0,0 +1,897 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect deduplicated referenced file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{ + bucket_dir_name, BinaryRow, DataField, IndexManifest, Manifest, ManifestEntry, + ManifestFileMeta, PartitionComputer, +}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files (deduplicated). +/// +/// Each row represents the unique referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"branch:main"`: main branch snapshots + tags +/// - `"branch:<name>"`: a specific branch +/// +/// Files are deduplicated by file name within each scope, so the sum +/// represents actual disk usage that is still referenced (protected from cleanup). +/// Both ADD and DELETE manifest entries are included since both reference +/// physical files that cannot be removed until the snapshot expires. +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +/// Deduplicated file set for a scope, keyed by file name. +#[derive(Default)] +struct ScopeFileSet { + manifest_files: HashMap<String, i64>, + data_files: HashMap<String, i64>, + index_files: HashMap<String, i64>, +} + +impl ScopeFileSet { + fn to_summary(&self, source: &str) -> ReferencedFilesSummary { + ReferencedFilesSummary { + source: source.to_string(), + manifest_file_count: self.manifest_files.len() as i64, + manifest_file_size: self.manifest_files.values().sum(), + data_file_count: self.data_files.len() as i64, + data_file_size: self.data_files.values().sum(), + index_file_count: self.index_files.len() as i64, + index_file_size: self.index_files.values().sum(), + } + } + + fn merge(&mut self, other: &ScopeFileSet) { + for (k, v) in &other.manifest_files { + self.manifest_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.data_files { + self.data_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.index_files { + self.index_files.entry(k.clone()).or_insert(*v); + } + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached data file entries (file_name, file_size) per manifest file full path. +type ManifestCache = Mutex<HashMap<String, Vec<(String, i64)>>>; + +/// Resolves extra file paths for stat-ing their real sizes. +struct ExtraFileResolver { + table_location: String, + partition_computer: Option<PartitionComputer>, +} + +impl ExtraFileResolver { + fn new(table_location: &str, partition_keys: &[String], schema_fields: &[DataField]) -> Self { + let partition_computer = if partition_keys.is_empty() { + None + } else { + PartitionComputer::new( + partition_keys, + schema_fields, + "__DEFAULT_PARTITION__", + false, + ) + .ok() + }; + Self { + table_location: table_location.to_string(), + partition_computer, + } + } + + fn resolve_extra_file_path( + &self, + partition_bytes: &[u8], + bucket: i32, + extra_file_name: &str, + ) -> Option<String> { + let partition_path = if let Some(ref computer) = self.partition_computer { + let row = BinaryRow::from_serialized_bytes(partition_bytes).ok()?; + computer.generate_partition_path(&row).ok()? + } else { + String::new() + }; + let bucket_dir = bucket_dir_name(bucket); + Some(format!( + "{}/{}{}/{}", + self.table_location, partition_path, bucket_dir, extra_file_name + )) + } +} + +/// Collect per-scope deduplicated referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all referenced files from main branch, tags, and branches +/// 2. `"branch:main"` — main branch snapshots + tag snapshots +/// 3. `"branch:<name>"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Files are deduplicated by name within each scope to produce an accurate +/// count of unique referenced files. Both ADD and DELETE entries are included +/// since both reference physical files protected from cleanup. +/// +/// Manifest list files and index manifest files are counted as manifest files, +/// consistent with `physical_files_size` classification. +/// +/// Extra files referenced by data file entries are stat-ed to obtain their +/// real sizes, using partition/bucket info to construct full paths. +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, + partition_keys: &[String], + schema_fields: &[DataField], +) -> crate::Result<Vec<ReferencedFilesSummary>> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + let extra_resolver = ExtraFileResolver::new(table_location, partition_keys, schema_fields); + let extra_resolver_ref = &extra_resolver; + + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + + // 1. Main branch snapshots + tags (concurrently) + // For main branch, snapshot reading and manifest resolution both use root SM. + let (main_files, tag_files) = tokio::try_join!( + collect_scope_files(file_io, &sm, &sm, manifest_cache_ref, extra_resolver_ref), + collect_tag_files( + file_io, + &sm, + &sm, + &tm, + manifest_cache_ref, + extra_resolver_ref + ), + )?; + let mut main_files = main_files; + main_files.merge(&tag_files); + + // 2. Branch file sets (all branches concurrently) + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + + let sm_ref = &sm; + let branch_futures: Vec<_> = branch_names + .iter() + .map(|branch_name| { + let branch_sm = sm.with_branch(branch_name); + let branch_tm = tm.with_branch(branch_name); + async move { + // Branch SM reads snapshot/tag files from branch path, + // but manifest paths are always resolved from the table root. + let (mut branch_files, branch_tag_files) = tokio::try_join!( + collect_scope_files( + file_io, + &branch_sm, + sm_ref, + manifest_cache_ref, + extra_resolver_ref + ), + collect_tag_files( + file_io, + &branch_sm, + sm_ref, + &branch_tm, + manifest_cache_ref, + extra_resolver_ref + ), + )?; + branch_files.merge(&branch_tag_files); + Ok::<_, crate::Error>(branch_files) + } + }) + .collect(); + let branch_results = try_join_all(branch_futures).await?; + + // 3. Assemble output: total, main, branches + let mut total_files = ScopeFileSet::default(); + total_files.merge(&main_files); + for bs in &branch_results { + total_files.merge(bs); + } + + let mut result = vec![ + total_files.to_summary("total"), + main_files.to_summary("branch:main"), + ]; + for (name, files) in branch_names.iter().zip(&branch_results) { + result.push(files.to_summary(&format!("branch:{name}"))); + } + Ok(result) +} + +async fn collect_scope_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<ScopeFileSet> { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec<Option<ScopeFileSet>> = stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_files( + file_io, + &sm, + manifest_sm, + snapshot_id, + manifest_cache, + extra_resolver, + ) + .await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut merged = ScopeFileSet::default(); + for fs in per_snapshot.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_tag_files( + file_io: &FileIO, + _sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + tm: &TagManager, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<ScopeFileSet> { + let tag_names = tm.list_all_names().await?; + + let tag_futures: Vec<_> = tag_names + .iter() + .map(|tag_name| async move { + let snapshot = match tm.get(tag_name).await? { + Some(s) => s, + None => return Ok(None), + }; + collect_snapshot_files( + file_io, + manifest_sm, + &snapshot, + manifest_cache, + extra_resolver, + ) + .await + }) + .collect(); + let tag_results = try_join_all(tag_futures).await?; + + let mut merged = ScopeFileSet::default(); + for fs in tag_results.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_single_snapshot_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + snapshot_id: i64, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<Option<ScopeFileSet>> { + let snapshot = match try_get_snapshot(sm, snapshot_id).await? { + Some(s) => s, + None => return Ok(None), + }; + + collect_snapshot_files( + file_io, + manifest_sm, + &snapshot, + manifest_cache, + extra_resolver, + ) + .await +} + +async fn collect_snapshot_files( + file_io: &FileIO, + manifest_sm: &SnapshotManager, + snapshot: &crate::spec::Snapshot, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result<Option<ScopeFileSet>> { + let mut file_set = ScopeFileSet::default(); + + // Collect manifest list file names (these are manifest-type files themselves) + let mut manifest_list_names = vec![ + snapshot.base_manifest_list().to_string(), + snapshot.delta_manifest_list().to_string(), + ]; + if let Some(cl) = snapshot.changelog_manifest_list() { + manifest_list_names.push(cl.to_string()); + } + + // Pre-compute paths (always resolved from table root) + let manifest_list_paths: Vec<String> = manifest_list_names + .iter() + .map(|name| manifest_sm.manifest_path(name)) + .collect(); + + // Read all manifest lists concurrently and record their sizes + let manifest_list_futures: Vec<_> = manifest_list_paths + .iter() + .map(|path| try_read_manifest_list_with_size(file_io, path)) + .collect(); + let manifest_list_results = try_join_all(manifest_list_futures).await?; + + // Register manifest list files themselves as manifest files + for (name, (_, size)) in manifest_list_names.iter().zip(&manifest_list_results) { + if *size > 0 { + file_set.manifest_files.entry(name.clone()).or_insert(*size); + } + } + + // Flatten all manifest file metas from all manifest lists + let all_manifest_metas: Vec<&ManifestFileMeta> = manifest_list_results + .iter() + .flat_map(|(metas, _)| metas.iter()) + .collect(); + + // Register manifest files + for meta in &all_manifest_metas { + file_set + .manifest_files + .entry(meta.file_name().to_string()) + .or_insert(meta.file_size()); + } + + // Read manifest files to get data file entries, using cache by full path + let manifest_paths: Vec<String> = all_manifest_metas + .iter() + .map(|meta| manifest_sm.manifest_path(meta.file_name())) + .collect(); + + let uncached_indices: Vec<usize> = manifest_paths + .iter() + .enumerate() + .filter(|(_, path)| { + let cache = manifest_cache.lock().unwrap(); + !cache.contains_key(path.as_str()) + }) + .map(|(i, _)| i) + .collect(); + + if !uncached_indices.is_empty() { + let uncached_paths: Vec<&str> = uncached_indices + .iter() + .map(|&i| manifest_paths[i].as_str()) + .collect(); + + let manifest_futures: Vec<_> = uncached_paths + .iter() + .map(|path| try_read_manifest(file_io, path)) + .collect(); + let results = try_join_all(manifest_futures).await?; + + // Collect extra files that need stat-ing + let mut extra_file_stat_tasks: Vec<(usize, usize, String)> = Vec::new(); + let mut all_file_entries: Vec<Vec<(String, i64)>> = Vec::with_capacity(results.len()); + + for (manifest_idx, entries) in results.iter().enumerate() { + let mut file_entries: Vec<(String, i64)> = Vec::new(); + for e in entries { + file_entries.push((e.file().file_name.clone(), e.file().file_size)); + for extra in &e.file().extra_files { + let entry_idx = file_entries.len(); + let full_path = + extra_resolver.resolve_extra_file_path(e.partition(), e.bucket(), extra); + if let Some(path) = full_path { + extra_file_stat_tasks.push((manifest_idx, entry_idx, path)); + } + file_entries.push((extra.clone(), 0)); + } + } + all_file_entries.push(file_entries); + } + + // Batch stat extra files concurrently + if !extra_file_stat_tasks.is_empty() { + let stat_futures: Vec<_> = extra_file_stat_tasks + .iter() + .map(|(_, _, path)| try_stat_file_size(file_io, path)) + .collect(); + let stat_results = try_join_all(stat_futures).await?; + + for ((manifest_idx, entry_idx, _), size) in + extra_file_stat_tasks.iter().zip(stat_results) + { + if size > 0 { + all_file_entries[*manifest_idx][*entry_idx].1 = size; + } + } + } + + let mut cache = manifest_cache.lock().unwrap(); + for (path, file_entries) in uncached_paths.into_iter().zip(all_file_entries) { + cache.insert(path.to_string(), file_entries); + } + } + + // Collect data files from cache (deduplicated by HashMap key) + { + let cache = manifest_cache.lock().unwrap(); + for path in &manifest_paths { + if let Some(entries) = cache.get(path.as_str()) { + for (name, size) in entries { + file_set.data_files.entry(name.clone()).or_insert(*size); + } + } + } + } + + // Read index manifest if present + if let Some(index_manifest_name) = snapshot.index_manifest() { + // The index manifest file itself is a manifest-type file + let index_manifest_path = manifest_sm.manifest_path(index_manifest_name); + let index_entries = + try_read_index_manifest_with_size(file_io, &index_manifest_path).await?; + + if index_entries.1 > 0 { + file_set + .manifest_files + .entry(index_manifest_name.to_string()) + .or_insert(index_entries.1); + } + + for entry in &index_entries.0 { + file_set + .index_files + .entry(entry.index_file.file_name.clone()) + .or_insert(entry.index_file.file_size as i64); + } + } + + Ok(Some(file_set)) +} + +async fn try_get_snapshot( + sm: &SnapshotManager, + snapshot_id: i64, +) -> crate::Result<Option<crate::spec::Snapshot>> { + match sm.get_snapshot(snapshot_id).await { + Ok(s) => Ok(Some(s)), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(None) + } + Err(crate::Error::DataInvalid { ref message, .. }) + if message.contains("does not exist") => + { + Ok(None) + } + Err(e) => Err(e), + } +} + +/// Read a manifest list file. Returns (entries, file_size_in_bytes). +async fn try_read_manifest_list_with_size( + file_io: &FileIO, + path: &str, +) -> crate::Result<(Vec<ManifestFileMeta>, i64)> { + let input = file_io.new_input(path)?; + match input.read().await { + Ok(bytes) => { + let size = bytes.len() as i64; + let metas = crate::spec::avro::from_avro_bytes_fast(&bytes)?; + Ok((metas, size)) + } + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok((Vec::new(), 0)) + } + Err(e) => Err(e), + } +} + +async fn try_read_manifest(file_io: &FileIO, path: &str) -> crate::Result<Vec<ManifestEntry>> { + match Manifest::read(file_io, path).await { + Ok(entries) => Ok(entries), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(Vec::new()) + } + Err(e) => Err(e), + } +} + +/// Stat a file to get its size. Returns 0 if the file is not found. +async fn try_stat_file_size(file_io: &FileIO, path: &str) -> crate::Result<i64> { + let input = file_io.new_input(path)?; + match input.metadata().await { + Ok(status) => Ok(status.size as i64), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(0) + } + Err(e) => Err(e), + } +} + +/// Read an index manifest file. Returns (entries, file_size_in_bytes). +async fn try_read_index_manifest_with_size( + file_io: &FileIO, + path: &str, +) -> crate::Result<(Vec<crate::spec::IndexManifestEntry>, i64)> { + match IndexManifest::read_with_size(file_io, path).await { + Ok(result) => Ok(result), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok((Vec::new(), 0)) + } + Err(e) => Err(e), + } +} + +/// Summary of all physical files in the table directory, categorized by file type. +#[derive(Debug, Clone, Default)] +pub struct PhysicalFilesSummary { + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +/// Categorize a file name into a file type. +fn classify_file_name(file_name: &str) -> FileType { + if file_name.starts_with("manifest-") || file_name.starts_with("index-manifest-") { + FileType::Manifest + } else if file_name.starts_with("data-") { Review Comment: This assumes physical data files always use the default `data-` prefix. Paimon supports `data-file.prefix`, and Java's `DataFilePathFactory` uses that configured prefix when creating data file names. For a table with a custom prefix, `referenced_files_size` can still count the files from manifests, but `physical_files_size` will classify the same files as `Other`, so the comparison becomes misleading. ########## docs/src/sql.md: ########## @@ -537,6 +537,105 @@ SELECT * FROM full_text_search('paimon.my_db.docs', 'content', 'paimon search', The function searches across all Tantivy full-text index files for the target column, merges results by relevance score, and returns the top-k matching rows. If no matching index is found, an empty result is returned. +## Referenced Files Size + +The `referenced_files_size` table-valued function computes aggregated file size summaries for all snapshots referenced by a table, including snapshots from the main branch, tags, and other branches. This is useful for understanding storage usage and for orphan file cleanup. Review Comment: The orphan-cleanup wording is a little broader than the current implementation. Java's orphan cleaner also protects `snapshot.statistics()`, while this PR reports only manifest/data/index files. If statistics files are intentionally out of scope here, the doc should say this is a manifest/data/index summary rather than implying full orphan-cleanup coverage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
