QuakeWang commented on code in PR #323: URL: https://github.com/apache/paimon-rust/pull/323#discussion_r3256876538
########## crates/paimon/src/table/referenced_files.rs: ########## @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect per-snapshot file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{IndexManifest, Manifest, ManifestEntry, ManifestFileMeta}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files. +/// +/// Each row represents the total referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"main"`: main branch snapshots + tags +/// - `"branch:<name>"`: a specific branch +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +impl ReferencedFilesSummary { + fn accumulate(&mut self, other: &ReferencedFilesSummary) { + self.manifest_file_count += other.manifest_file_count; + self.manifest_file_size += other.manifest_file_size; + self.data_file_count += other.data_file_count; + self.data_file_size += other.data_file_size; + self.index_file_count += other.index_file_count; + self.index_file_size += other.index_file_size; + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached (data_file_count, data_file_size) per manifest file full path. +type ManifestCache = Mutex<HashMap<String, (i64, i64)>>; + +/// Collect per-scope referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all snapshots from main branch, tags, and branches +/// 2. `"main"` — main branch snapshots + tag snapshots +/// 3. `"branch:<name>"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Manifest files that have been deleted by concurrent cleanup are gracefully +/// skipped (treated as contributing 0 files/bytes). +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, +) -> crate::Result<Vec<ReferencedFilesSummary>> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + + // 1. Main branch snapshots + tags + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let mut main_summary = + collect_scope_summary(file_io, &sm, "branch:main", manifest_cache_ref).await?; + + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + let tag_summary = collect_tag_scope_summary(file_io, &sm, &tm, manifest_cache_ref).await?; + main_summary.accumulate(&tag_summary); + + // 2. Branch summaries + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + let mut branch_summaries = Vec::new(); + for branch_name in &branch_names { + let branch_sm = sm.with_branch(branch_name); + let branch_summary = collect_scope_summary( + file_io, + &branch_sm, + &format!("branch:{branch_name}"), + manifest_cache_ref, + ) + .await?; + branch_summaries.push(branch_summary); + } + + // 3. Assemble output: total, main, branches + let mut total = ReferencedFilesSummary { + source: "total".to_string(), + ..Default::default() + }; + total.accumulate(&main_summary); + for bs in &branch_summaries { + total.accumulate(bs); + } + + let mut result = vec![total, main_summary]; + result.extend(branch_summaries); + Ok(result) +} + +async fn collect_scope_summary( + file_io: &FileIO, + sm: &SnapshotManager, + source: &str, + manifest_cache: &ManifestCache, +) -> crate::Result<ReferencedFilesSummary> { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec<Option<ReferencedFilesSummary>> = stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_summary(file_io, &sm, snapshot_id, manifest_cache).await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut summary = ReferencedFilesSummary { + source: source.to_string(), + ..Default::default() + }; + for s in per_snapshot.into_iter().flatten() { Review Comment: A newer Paimon snapshot carries previous base/delta manifests into its new base manifest list, so the same still-referenced manifest/data/index file can appear in many snapshot summaries. Since this function is documented as comparable with physical size for orphan-file analysis, summing snapshots can make referenced size grow with snapshot count and even exceed physical size. This should collect a de-duplicated referenced file set by file identity/path first, then aggregate sizes from that set. ########## crates/paimon/src/table/referenced_files.rs: ########## @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect per-snapshot file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{IndexManifest, Manifest, ManifestEntry, ManifestFileMeta}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files. +/// +/// Each row represents the total referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"main"`: main branch snapshots + tags +/// - `"branch:<name>"`: a specific branch +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +impl ReferencedFilesSummary { + fn accumulate(&mut self, other: &ReferencedFilesSummary) { + self.manifest_file_count += other.manifest_file_count; + self.manifest_file_size += other.manifest_file_size; + self.data_file_count += other.data_file_count; + self.data_file_size += other.data_file_size; + self.index_file_count += other.index_file_count; + self.index_file_size += other.index_file_size; + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached (data_file_count, data_file_size) per manifest file full path. +type ManifestCache = Mutex<HashMap<String, (i64, i64)>>; + +/// Collect per-scope referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all snapshots from main branch, tags, and branches +/// 2. `"main"` — main branch snapshots + tag snapshots +/// 3. `"branch:<name>"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Manifest files that have been deleted by concurrent cleanup are gracefully +/// skipped (treated as contributing 0 files/bytes). +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, +) -> crate::Result<Vec<ReferencedFilesSummary>> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + + // 1. Main branch snapshots + tags + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let mut main_summary = + collect_scope_summary(file_io, &sm, "branch:main", manifest_cache_ref).await?; + + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + let tag_summary = collect_tag_scope_summary(file_io, &sm, &tm, manifest_cache_ref).await?; + main_summary.accumulate(&tag_summary); + + // 2. Branch summaries + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + let mut branch_summaries = Vec::new(); + for branch_name in &branch_names { + let branch_sm = sm.with_branch(branch_name); + let branch_summary = collect_scope_summary( + file_io, + &branch_sm, + &format!("branch:{branch_name}"), + manifest_cache_ref, + ) + .await?; + branch_summaries.push(branch_summary); + } + + // 3. Assemble output: total, main, branches + let mut total = ReferencedFilesSummary { + source: "total".to_string(), + ..Default::default() + }; + total.accumulate(&main_summary); + for bs in &branch_summaries { + total.accumulate(bs); + } + + let mut result = vec![total, main_summary]; + result.extend(branch_summaries); + Ok(result) +} + +async fn collect_scope_summary( + file_io: &FileIO, + sm: &SnapshotManager, + source: &str, + manifest_cache: &ManifestCache, +) -> crate::Result<ReferencedFilesSummary> { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec<Option<ReferencedFilesSummary>> = stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_summary(file_io, &sm, snapshot_id, manifest_cache).await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut summary = ReferencedFilesSummary { + source: source.to_string(), + ..Default::default() + }; + for s in per_snapshot.into_iter().flatten() { + summary.accumulate(&s); + } + Ok(summary) +} + +async fn collect_tag_scope_summary( + file_io: &FileIO, + sm: &SnapshotManager, + tm: &TagManager, + manifest_cache: &ManifestCache, +) -> crate::Result<ReferencedFilesSummary> { + let tag_names = tm.list_all_names().await?; + let mut summary = ReferencedFilesSummary::default(); + + for tag_name in &tag_names { + let snapshot = match tm.get(tag_name).await? { + Some(s) => s, + None => continue, + }; + if let Some(s) = collect_snapshot_summary(file_io, sm, &snapshot, manifest_cache).await? { + summary.accumulate(&s); + } + } + + Ok(summary) +} + +async fn collect_single_snapshot_summary( + file_io: &FileIO, + sm: &SnapshotManager, + snapshot_id: i64, + manifest_cache: &ManifestCache, +) -> crate::Result<Option<ReferencedFilesSummary>> { + let snapshot = match try_get_snapshot(sm, snapshot_id).await? { + Some(s) => s, + None => return Ok(None), + }; + + collect_snapshot_summary(file_io, sm, &snapshot, manifest_cache).await +} + +async fn collect_snapshot_summary( + file_io: &FileIO, + sm: &SnapshotManager, + snapshot: &crate::spec::Snapshot, + manifest_cache: &ManifestCache, +) -> crate::Result<Option<ReferencedFilesSummary>> { + let mut summary = ReferencedFilesSummary::default(); + + // Collect manifest list file names + let mut manifest_list_names = vec![ + snapshot.base_manifest_list().to_string(), + snapshot.delta_manifest_list().to_string(), + ]; + if let Some(cl) = snapshot.changelog_manifest_list() { + manifest_list_names.push(cl.to_string()); + } + + // Pre-compute paths so futures can borrow them + let manifest_list_paths: Vec<String> = manifest_list_names + .iter() + .map(|name| sm.manifest_path(name)) + .collect(); + + // Read all manifest lists concurrently + let manifest_list_futures: Vec<_> = manifest_list_paths + .iter() + .map(|path| try_read_manifest_list(file_io, path)) + .collect(); + let manifest_lists = try_join_all(manifest_list_futures).await?; + + // Flatten all manifest file metas from all manifest lists + let all_manifest_metas: Vec<&ManifestFileMeta> = + manifest_lists.iter().flat_map(|ml| ml.iter()).collect(); + + summary.manifest_file_count = all_manifest_metas.len() as i64; Review Comment: This code only counts `ManifestFileMeta` entries inside the manifest lists, but the files named by `snapshot.base_manifest_list()`, `delta_manifest_list()`, and `changelog_manifest_list()` are themselves snapshot-referenced metadata files. The `snapshot.index_manifest()` file is also read below but not counted as a referenced manifest file. Since `physical_files_size` classifies both `manifest-list-*` and `index-manifest-*` as manifest files, the two functions cannot be safely compared until these directly referenced metadata files are included and de-duplicated. ########## crates/paimon/src/table/referenced_files.rs: ########## @@ -0,0 +1,535 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect per-snapshot file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{IndexManifest, Manifest, ManifestEntry, ManifestFileMeta}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files. +/// +/// Each row represents the total referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"main"`: main branch snapshots + tags +/// - `"branch:<name>"`: a specific branch +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +impl ReferencedFilesSummary { + fn accumulate(&mut self, other: &ReferencedFilesSummary) { + self.manifest_file_count += other.manifest_file_count; + self.manifest_file_size += other.manifest_file_size; + self.data_file_count += other.data_file_count; + self.data_file_size += other.data_file_size; + self.index_file_count += other.index_file_count; + self.index_file_size += other.index_file_size; + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached (data_file_count, data_file_size) per manifest file full path. +type ManifestCache = Mutex<HashMap<String, (i64, i64)>>; + +/// Collect per-scope referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all snapshots from main branch, tags, and branches +/// 2. `"main"` — main branch snapshots + tag snapshots +/// 3. `"branch:<name>"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Manifest files that have been deleted by concurrent cleanup are gracefully +/// skipped (treated as contributing 0 files/bytes). +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, +) -> crate::Result<Vec<ReferencedFilesSummary>> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + + // 1. Main branch snapshots + tags + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let mut main_summary = + collect_scope_summary(file_io, &sm, "branch:main", manifest_cache_ref).await?; + + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + let tag_summary = collect_tag_scope_summary(file_io, &sm, &tm, manifest_cache_ref).await?; + main_summary.accumulate(&tag_summary); + + // 2. Branch summaries + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + let mut branch_summaries = Vec::new(); + for branch_name in &branch_names { + let branch_sm = sm.with_branch(branch_name); + let branch_summary = collect_scope_summary( + file_io, + &branch_sm, + &format!("branch:{branch_name}"), + manifest_cache_ref, + ) + .await?; + branch_summaries.push(branch_summary); + } + + // 3. Assemble output: total, main, branches + let mut total = ReferencedFilesSummary { + source: "total".to_string(), + ..Default::default() + }; + total.accumulate(&main_summary); + for bs in &branch_summaries { + total.accumulate(bs); + } + + let mut result = vec![total, main_summary]; + result.extend(branch_summaries); + Ok(result) +} + +async fn collect_scope_summary( + file_io: &FileIO, + sm: &SnapshotManager, + source: &str, + manifest_cache: &ManifestCache, +) -> crate::Result<ReferencedFilesSummary> { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec<Option<ReferencedFilesSummary>> = stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_summary(file_io, &sm, snapshot_id, manifest_cache).await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut summary = ReferencedFilesSummary { + source: source.to_string(), + ..Default::default() + }; + for s in per_snapshot.into_iter().flatten() { + summary.accumulate(&s); + } + Ok(summary) +} + +async fn collect_tag_scope_summary( + file_io: &FileIO, + sm: &SnapshotManager, + tm: &TagManager, + manifest_cache: &ManifestCache, +) -> crate::Result<ReferencedFilesSummary> { + let tag_names = tm.list_all_names().await?; + let mut summary = ReferencedFilesSummary::default(); + + for tag_name in &tag_names { + let snapshot = match tm.get(tag_name).await? { + Some(s) => s, + None => continue, + }; + if let Some(s) = collect_snapshot_summary(file_io, sm, &snapshot, manifest_cache).await? { + summary.accumulate(&s); + } + } + + Ok(summary) +} + +async fn collect_single_snapshot_summary( + file_io: &FileIO, + sm: &SnapshotManager, + snapshot_id: i64, + manifest_cache: &ManifestCache, +) -> crate::Result<Option<ReferencedFilesSummary>> { + let snapshot = match try_get_snapshot(sm, snapshot_id).await? { + Some(s) => s, + None => return Ok(None), + }; + + collect_snapshot_summary(file_io, sm, &snapshot, manifest_cache).await +} + +async fn collect_snapshot_summary( + file_io: &FileIO, + sm: &SnapshotManager, + snapshot: &crate::spec::Snapshot, + manifest_cache: &ManifestCache, +) -> crate::Result<Option<ReferencedFilesSummary>> { + let mut summary = ReferencedFilesSummary::default(); + + // Collect manifest list file names + let mut manifest_list_names = vec![ + snapshot.base_manifest_list().to_string(), + snapshot.delta_manifest_list().to_string(), + ]; + if let Some(cl) = snapshot.changelog_manifest_list() { + manifest_list_names.push(cl.to_string()); + } + + // Pre-compute paths so futures can borrow them + let manifest_list_paths: Vec<String> = manifest_list_names + .iter() + .map(|name| sm.manifest_path(name)) + .collect(); + + // Read all manifest lists concurrently + let manifest_list_futures: Vec<_> = manifest_list_paths + .iter() + .map(|path| try_read_manifest_list(file_io, path)) + .collect(); + let manifest_lists = try_join_all(manifest_list_futures).await?; + + // Flatten all manifest file metas from all manifest lists + let all_manifest_metas: Vec<&ManifestFileMeta> = + manifest_lists.iter().flat_map(|ml| ml.iter()).collect(); + + summary.manifest_file_count = all_manifest_metas.len() as i64; + summary.manifest_file_size = all_manifest_metas.iter().map(|m| m.file_size()).sum(); + + // Read manifest files to get data file stats, using cache by full path + let manifest_paths: Vec<String> = all_manifest_metas + .iter() + .map(|meta| sm.manifest_path(meta.file_name())) + .collect(); + + let uncached_indices: Vec<usize> = manifest_paths + .iter() + .enumerate() + .filter(|(_, path)| { + let cache = manifest_cache.lock().unwrap(); + !cache.contains_key(path.as_str()) + }) + .map(|(i, _)| i) + .collect(); + + // Only read manifests not yet in cache + if !uncached_indices.is_empty() { + let uncached_paths: Vec<&str> = uncached_indices + .iter() + .map(|&i| manifest_paths[i].as_str()) + .collect(); + + let manifest_futures: Vec<_> = uncached_paths + .iter() + .map(|path| try_read_manifest(file_io, path)) + .collect(); + let results = try_join_all(manifest_futures).await?; + + // Store results in cache + let mut cache = manifest_cache.lock().unwrap(); + for (path, entries) in uncached_paths.into_iter().zip(results) { Review Comment: Manifest entries carry `FileKind::Add/Delete`, and after overwrite/compaction the same data file can be present as an ADD from a historical base manifest and as a DELETE in the current delta manifest. Java's orphan cleaner uses a used-file-name set, while scan semantics first merge ADD/DELETE entries before reading live files; this code does neither and simply adds entry count and file size. Please define whether this function reports cleanup-protected referenced files or live readable files, then aggregate from a de-duplicated file set or from merged ADD/DELETE entries instead of raw entries. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
