QuakeWang commented on code in PR #323: URL: https://github.com/apache/paimon-rust/pull/323#discussion_r3257599638
########## crates/paimon/src/table/referenced_files.rs: ########## @@ -0,0 +1,592 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect deduplicated referenced file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{IndexManifest, Manifest, ManifestEntry, ManifestFileMeta}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files (deduplicated). +/// +/// Each row represents the unique referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"branch:main"`: main branch snapshots + tags +/// - `"branch:<name>"`: a specific branch +/// +/// Files are deduplicated by file name within each scope, so the sum +/// represents actual disk usage that is still referenced (protected from cleanup). +/// Both ADD and DELETE manifest entries are included since both reference +/// physical files that cannot be removed until the snapshot expires. +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +/// Deduplicated file set for a scope, keyed by file name. +#[derive(Default)] +struct ScopeFileSet { + manifest_files: HashMap<String, i64>, + data_files: HashMap<String, i64>, + index_files: HashMap<String, i64>, +} + +impl ScopeFileSet { + fn to_summary(&self, source: &str) -> ReferencedFilesSummary { + ReferencedFilesSummary { + source: source.to_string(), + manifest_file_count: self.manifest_files.len() as i64, + manifest_file_size: self.manifest_files.values().sum(), + data_file_count: self.data_files.len() as i64, + data_file_size: self.data_files.values().sum(), + index_file_count: self.index_files.len() as i64, + index_file_size: self.index_files.values().sum(), + } + } + + fn merge(&mut self, other: &ScopeFileSet) { + for (k, v) in &other.manifest_files { + self.manifest_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.data_files { + self.data_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.index_files { + self.index_files.entry(k.clone()).or_insert(*v); + } + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached data file entries (file_name, file_size) per manifest file full path. +type ManifestCache = Mutex<HashMap<String, Vec<(String, i64)>>>; + +/// Collect per-scope deduplicated referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all referenced files from main branch, tags, and branches +/// 2. `"branch:main"` — main branch snapshots + tag snapshots +/// 3. `"branch:<name>"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Files are deduplicated by name within each scope to produce an accurate +/// count of unique referenced files. Both ADD and DELETE entries are included +/// since both reference physical files protected from cleanup. +/// +/// Manifest list files and index manifest files are counted as manifest files, +/// consistent with `physical_files_size` classification. +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, +) -> crate::Result<Vec<ReferencedFilesSummary>> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + + // 1. Main branch snapshots + tags + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let mut main_files = collect_scope_files(file_io, &sm, manifest_cache_ref).await?; + + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + let tag_files = collect_tag_files(file_io, &sm, &tm, manifest_cache_ref).await?; + main_files.merge(&tag_files); + + // 2. Branch file sets + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + let mut branch_file_sets = Vec::new(); + for branch_name in &branch_names { + let branch_sm = sm.with_branch(branch_name); + let branch_files = collect_scope_files(file_io, &branch_sm, manifest_cache_ref).await?; + branch_file_sets.push((branch_name.clone(), branch_files)); + } + + // 3. Assemble output: total, main, branches + let mut total_files = ScopeFileSet::default(); + total_files.merge(&main_files); + for (_, bs) in &branch_file_sets { + total_files.merge(bs); + } + + let mut result = vec![ + total_files.to_summary("total"), + main_files.to_summary("branch:main"), + ]; + for (name, files) in &branch_file_sets { + result.push(files.to_summary(&format!("branch:{name}"))); + } + Ok(result) +} + +async fn collect_scope_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_cache: &ManifestCache, +) -> crate::Result<ScopeFileSet> { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec<Option<ScopeFileSet>> = + stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_files(file_io, &sm, snapshot_id, manifest_cache).await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut merged = ScopeFileSet::default(); + for fs in per_snapshot.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_tag_files( + file_io: &FileIO, + sm: &SnapshotManager, + tm: &TagManager, + manifest_cache: &ManifestCache, +) -> crate::Result<ScopeFileSet> { + let tag_names = tm.list_all_names().await?; + let mut merged = ScopeFileSet::default(); + + for tag_name in &tag_names { + let snapshot = match tm.get(tag_name).await? { + Some(s) => s, + None => continue, + }; + if let Some(fs) = collect_snapshot_files(file_io, sm, &snapshot, manifest_cache).await? { + merged.merge(&fs); + } + } + + Ok(merged) +} + +async fn collect_single_snapshot_files( + file_io: &FileIO, + sm: &SnapshotManager, + snapshot_id: i64, + manifest_cache: &ManifestCache, +) -> crate::Result<Option<ScopeFileSet>> { + let snapshot = match try_get_snapshot(sm, snapshot_id).await? { + Some(s) => s, + None => return Ok(None), + }; + + collect_snapshot_files(file_io, sm, &snapshot, manifest_cache).await +} + +async fn collect_snapshot_files( + file_io: &FileIO, + sm: &SnapshotManager, + snapshot: &crate::spec::Snapshot, + manifest_cache: &ManifestCache, +) -> crate::Result<Option<ScopeFileSet>> { + let mut file_set = ScopeFileSet::default(); + + // Collect manifest list file names (these are manifest-type files themselves) + let mut manifest_list_names = vec![ + snapshot.base_manifest_list().to_string(), + snapshot.delta_manifest_list().to_string(), + ]; + if let Some(cl) = snapshot.changelog_manifest_list() { + manifest_list_names.push(cl.to_string()); + } + + // Pre-compute paths + let manifest_list_paths: Vec<String> = manifest_list_names + .iter() + .map(|name| sm.manifest_path(name)) + .collect(); + + // Read all manifest lists concurrently and record their sizes + let manifest_list_futures: Vec<_> = manifest_list_paths + .iter() + .map(|path| try_read_manifest_list_with_size(file_io, path)) + .collect(); + let manifest_list_results = try_join_all(manifest_list_futures).await?; + + // Register manifest list files themselves as manifest files + for (name, (_, size)) in manifest_list_names.iter().zip(&manifest_list_results) { + if *size > 0 { + file_set.manifest_files.entry(name.clone()).or_insert(*size); + } + } + + // Flatten all manifest file metas from all manifest lists + let all_manifest_metas: Vec<&ManifestFileMeta> = manifest_list_results + .iter() + .flat_map(|(metas, _)| metas.iter()) + .collect(); + + // Register manifest files + for meta in &all_manifest_metas { + file_set + .manifest_files + .entry(meta.file_name().to_string()) + .or_insert(meta.file_size()); + } + + // Read manifest files to get data file entries, using cache by full path + let manifest_paths: Vec<String> = all_manifest_metas + .iter() + .map(|meta| sm.manifest_path(meta.file_name())) + .collect(); + + let uncached_indices: Vec<usize> = manifest_paths + .iter() + .enumerate() + .filter(|(_, path)| { + let cache = manifest_cache.lock().unwrap(); + !cache.contains_key(path.as_str()) + }) + .map(|(i, _)| i) + .collect(); + + if !uncached_indices.is_empty() { + let uncached_paths: Vec<&str> = uncached_indices + .iter() + .map(|&i| manifest_paths[i].as_str()) + .collect(); + + let manifest_futures: Vec<_> = uncached_paths + .iter() + .map(|path| try_read_manifest(file_io, path)) + .collect(); + let results = try_join_all(manifest_futures).await?; + + let mut cache = manifest_cache.lock().unwrap(); + for (path, entries) in uncached_paths.into_iter().zip(results) { + let file_entries: Vec<(String, i64)> = entries + .iter() + .map(|e| (e.file().file_name.clone(), e.file().file_size)) Review Comment: The ADD/DELETE semantics look clearer now, but this still only adds `file_name` to the referenced data-file set. Should we also include `e.file().extra_files` here? Java orphan cleanup treats extra files as used files too, and otherwise file-index / blob / lookup-related files can appear as orphan delta when comparing with `physical_files_size`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
