This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new c954893 fix: classify physical files by path context (#341)
c954893 is described below
commit c954893b875b921db2b4a4e46047341fb464ab45
Author: Jiwen liu <[email protected]>
AuthorDate: Sun May 24 20:57:45 2026 +0800
fix: classify physical files by path context (#341)
---
.../src/system_tables/physical_files_size.rs | 3 +-
crates/paimon/src/table/referenced_files.rs | 302 +++++++++++++++++++--
docs/src/sql.md | 18 +-
3 files changed, 286 insertions(+), 37 deletions(-)
diff --git
a/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
index 72b3d9e..5380612 100644
--- a/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
+++ b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
@@ -82,7 +82,8 @@ impl TableProvider for PhysicalFilesSizeTable {
) -> DFResult<Arc<dyn ExecutionPlan>> {
let table = self.table.clone();
let summary = crate::runtime::await_with_runtime(async move {
- collect_physical_files_summary(table.file_io(),
table.location()).await
+ let partition_depth = table.schema().partition_keys().len();
+ collect_physical_files_summary(table.file_io(), table.location(),
partition_depth).await
})
.await
.map_err(to_datafusion_error)?;
diff --git a/crates/paimon/src/table/referenced_files.rs
b/crates/paimon/src/table/referenced_files.rs
index 2c41c67..a170b3c 100644
--- a/crates/paimon/src/table/referenced_files.rs
+++ b/crates/paimon/src/table/referenced_files.rs
@@ -614,25 +614,113 @@ pub struct PhysicalFilesSummary {
pub index_file_size: i64,
}
-/// Categorize a file name into a file type.
-/// Everything that is not a manifest/statistics or index file is classified
as data.
-fn classify_file_name(file_name: &str) -> FileType {
- if file_name.starts_with("manifest-")
- || file_name.starts_with("index-manifest-")
- || file_name.starts_with("statistics-")
- {
- FileType::Manifest
- } else if file_name.starts_with("index-") {
- FileType::Index
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum PhysicalFileKind {
+ Manifest,
+ Statistics,
+ Data,
+ Index,
+ Other,
+}
+
+fn table_relative_path<'a>(table_location: &str, path: &'a str) -> Option<&'a
str> {
+ let table_location = table_location.trim_end_matches('/');
+ if path == table_location {
+ Some("")
} else {
- FileType::Data
+ path.strip_prefix(table_location)
+ .and_then(|rest| rest.strip_prefix('/'))
}
}
-enum FileType {
- Manifest,
- Data,
- Index,
+fn is_manifest_file_name(file_name: &str) -> bool {
+ file_name.starts_with("manifest-")
+ || file_name.starts_with("manifest-list-")
+ || file_name.starts_with("index-manifest-")
+}
+
+fn is_bucket_dir_name(segment: &str) -> bool {
+ segment == "bucket-postpone"
+ || segment
+ .strip_prefix("bucket-")
+ .is_some_and(|bucket| !bucket.is_empty() && bucket.chars().all(|c|
c.is_ascii_digit()))
+}
+
+fn is_partition_segment(segment: &str) -> bool {
+ let Some((key, _value)) = segment.split_once('=') else {
+ return false;
+ };
+ !key.is_empty()
+}
+
+fn is_data_file_in_bucket(segments: &[&str], partition_depth: usize) -> bool {
+ if segments.len() != partition_depth + 2 {
+ return false;
+ }
+
+ segments[..partition_depth]
+ .iter()
+ .all(|segment| is_partition_segment(segment))
+ && is_bucket_dir_name(segments[partition_depth])
+ && !segments[partition_depth + 1].starts_with("index-")
+}
+
+fn is_data_file_in_data_dir(
+ relative_path: &str,
+ data_dir_relative_path: &str,
+ partition_depth: usize,
+) -> bool {
+ let data_dir_relative_path = data_dir_relative_path.trim_matches('/');
+ let data_relative_path = if data_dir_relative_path.is_empty() {
+ relative_path
+ } else {
+ let Some(rest) = relative_path
+ .strip_prefix(data_dir_relative_path)
+ .and_then(|rest| rest.strip_prefix('/'))
+ else {
+ return false;
+ };
+ rest
+ };
+ let segments = data_relative_path.split('/').collect::<Vec<_>>();
+ is_data_file_in_bucket(&segments, partition_depth)
+}
+
+fn classify_physical_path(
+ table_location: &str,
+ path: &str,
+ partition_depth: usize,
+ data_file_path_directory: Option<&str>,
+) -> PhysicalFileKind {
+ let Some(relative_path) = table_relative_path(table_location, path) else {
+ return PhysicalFileKind::Other;
+ };
+ let relative_path = relative_path.trim_matches('/');
+ if relative_path.is_empty() {
+ return PhysicalFileKind::Other;
+ }
+
+ let segments = relative_path.split('/').collect::<Vec<_>>();
+
+ match segments.as_slice() {
+ ["manifest", name] if is_manifest_file_name(name) =>
PhysicalFileKind::Manifest,
+ ["statistics", _] => PhysicalFileKind::Statistics,
+ ["index", _] => PhysicalFileKind::Index,
+ _ => {
+ if let Some(data_dir) = data_file_path_directory {
+ let data_dir = table_relative_path(table_location,
data_dir).unwrap_or(data_dir);
+ if is_data_file_in_data_dir(relative_path, data_dir,
partition_depth) {
+ PhysicalFileKind::Data
+ } else {
+ PhysicalFileKind::Other
+ }
+ } else if is_data_file_in_bucket(&segments, partition_depth) {
+ PhysicalFileKind::Data
+ } else {
+ PhysicalFileKind::Other
+ }
+ }
+ }
}
const DIR_LIST_CONCURRENCY: usize = 32;
@@ -643,13 +731,13 @@ const DIR_LIST_CONCURRENCY: usize = 32;
/// subdirectory recursively (up to 32 in parallel) to maximize throughput
/// on object stores with many partition directories.
///
-/// Files are classified by their file name prefix:
-/// - `manifest-*` / `index-manifest-*` → manifest
-/// - `index-*` (excluding `index-manifest-*`) → index
-/// - Everything else → data
+/// Files are classified by their table-relative path. Only recognized Paimon
+/// metadata directories and partition/bucket data paths are counted; unknown
+/// files are ignored by this summary.
pub async fn collect_physical_files_summary(
file_io: &FileIO,
table_location: &str,
+ partition_depth: usize,
) -> crate::Result<PhysicalFilesSummary> {
// List top-level entries to discover subdirectories and top-level files
let top_entries = match file_io.list_status(table_location).await {
@@ -670,8 +758,13 @@ pub async fn collect_physical_files_summary(
if entry.is_dir {
sub_dirs.push(entry.path.clone());
} else {
- let file_name =
entry.path.rsplit('/').next().unwrap_or(&entry.path);
- accumulate_file(&mut summary, file_name, entry.size);
+ accumulate_file(
+ &mut summary,
+ table_location,
+ &entry.path,
+ partition_depth,
+ entry.size,
+ );
}
}
@@ -695,28 +788,40 @@ pub async fn collect_physical_files_summary(
for result in dir_results {
let statuses = result?;
for status in &statuses {
- let file_name =
status.path.rsplit('/').next().unwrap_or(&status.path);
- accumulate_file(&mut summary, file_name, status.size);
+ accumulate_file(
+ &mut summary,
+ table_location,
+ &status.path,
+ partition_depth,
+ status.size,
+ );
}
}
Ok(summary)
}
-fn accumulate_file(summary: &mut PhysicalFilesSummary, file_name: &str, size:
u64) {
- match classify_file_name(file_name) {
- FileType::Manifest => {
+fn accumulate_file(
+ summary: &mut PhysicalFilesSummary,
+ table_location: &str,
+ path: &str,
+ partition_depth: usize,
+ size: u64,
+) {
+ match classify_physical_path(table_location, path, partition_depth, None) {
+ PhysicalFileKind::Manifest | PhysicalFileKind::Statistics => {
summary.manifest_file_count += 1;
summary.manifest_file_size += size as i64;
}
- FileType::Data => {
+ PhysicalFileKind::Data => {
summary.data_file_count += 1;
summary.data_file_size += size as i64;
}
- FileType::Index => {
+ PhysicalFileKind::Index => {
summary.index_file_count += 1;
summary.index_file_size += size as i64;
}
+ PhysicalFileKind::Other => {}
}
}
@@ -726,11 +831,21 @@ mod tests {
use crate::io::FileIOBuilder;
use crate::spec::{CommitKind, Snapshot};
use crate::table::{BranchManager, SnapshotManager, TagManager};
+ use bytes::Bytes;
fn test_file_io() -> FileIO {
FileIOBuilder::new("memory").build().unwrap()
}
+ async fn write_test_file(file_io: &FileIO, path: &str, content: &str) {
+ file_io
+ .new_output(path)
+ .unwrap()
+ .write(Bytes::from(content.to_string()))
+ .await
+ .unwrap();
+ }
+
#[tokio::test]
async fn test_collect_empty_table() {
let file_io = test_file_io();
@@ -788,6 +903,137 @@ mod tests {
assert_eq!(result[1].data_file_count, 0);
}
+ #[tokio::test]
+ async fn
test_physical_files_summary_uses_path_context_for_unpartitioned_table() {
+ let table_path = "memory:/test_physical_files_summary_path_context";
+ let file_io = test_file_io();
+
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/manifest/manifest-list-0"),
+ "manifest list",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/manifest/manifest-0"),
+ "manifest",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/manifest/index-manifest-0"),
+ "index manifest",
+ )
+ .await;
+ write_test_file(&file_io, &format!("{table_path}/index/index-0"),
"index").await;
+ write_test_file(&file_io, &format!("{table_path}/bucket-0/data-0"),
"data").await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/bucket-0/part-0.parquet"),
+ "data without prefix",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/bucket-0/index-should-not-be-data"),
+ "bucket index",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/bucket-postpone/data-u-0"),
+ "postpone data",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/nested/bucket-0/data-too-deep"),
+ "not a data bucket",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/data-root-file.parquet"),
+ "root data prefix",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/snapshot/snapshot-1"),
+ "snapshot",
+ )
+ .await;
+ write_test_file(&file_io, &format!("{table_path}/schema/schema-0"),
"schema").await;
+ write_test_file(&file_io, &format!("{table_path}/tag/tag-v1"),
"tag").await;
+ write_test_file(&file_io, &format!("{table_path}/_SUCCESS"),
"success").await;
+ write_test_file(&file_io, &format!("{table_path}/random-file"),
"random").await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/statistics/stat-0"),
+ "statistics",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/manifest/stat-0"),
+ "not classified by statistics prefix",
+ )
+ .await;
+
+ let result = collect_physical_files_summary(&file_io, table_path, 0)
+ .await
+ .unwrap();
+
+ assert_eq!(result.manifest_file_count, 4);
+ assert_eq!(result.index_file_count, 1);
+ assert_eq!(result.data_file_count, 3);
+ }
+
+ #[tokio::test]
+ async fn test_physical_files_summary_uses_partition_depth() {
+ let table_path = "memory:/test_physical_files_summary_partitioned";
+ let file_io = test_file_io();
+
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/dt=2026-05-21/bucket-0/part-0.parquet"),
+ "partition data",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/dt=2026-05-21/bucket-0/index-0"),
+ "partition bucket index",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/dt=2026-05-21/not-bucket/data-0"),
+ "not bucket",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/not_partition/bucket-0/data-0"),
+ "not partition",
+ )
+ .await;
+ write_test_file(
+ &file_io,
+ &format!("{table_path}/bucket-0/root-bucket-file"),
+ "wrong depth",
+ )
+ .await;
+
+ let result = collect_physical_files_summary(&file_io, table_path, 1)
+ .await
+ .unwrap();
+
+ assert_eq!(result.data_file_count, 1);
+ assert_eq!(result.index_file_count, 0);
+ }
+
#[tokio::test]
async fn test_branch_tag_referenced_files() {
use crate::spec::stats::BinaryTableStats;
diff --git a/docs/src/sql.md b/docs/src/sql.md
index 325d236..4a8dc4f 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -806,12 +806,14 @@ Columns:
### $physical_files_size
-Scan the table directory recursively and compute the total size of all
physical files on disk, categorized by file type. By comparing with
`$referenced_files_size`, you can identify orphan files that are no longer
referenced by any snapshot.
+Scan the table directory recursively and compute the total size of recognized
physical files on disk, categorized by file type. This table is a diagnostic
size summary; orphan cleanup needs file-level candidates and retention checks,
not just aggregate size differences.
-Files are classified by their file name prefix:
-- `manifest-*` / `index-manifest-*` → manifest
-- `index-*` (excluding `index-manifest-*`) → index
-- Everything else → data
+Files are classified by their table-relative path:
+- `manifest/manifest-*`, `manifest/manifest-list-*`, and
`manifest/index-manifest-*` → manifest
+- `statistics/*` → manifest file counters for the current compatible output
schema
+- `index/*` → index
+- `<partition>/bucket-*/*` and `<partition>/bucket-postpone/*` → data, using
the table's partition depth
+- unknown files are ignored by this summary
```sql
SELECT * FROM paimon.default.my_table$physical_files_size;
@@ -823,8 +825,8 @@ Columns:
|---|---|---|
| `manifest_file_count` | BIGINT | Number of manifest files on disk |
| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
-| `data_file_count` | BIGINT | Number of data files on disk |
-| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `data_file_count` | BIGINT | Number of recognized data files on disk |
+| `data_file_size` | BIGINT | Total size of recognized data files (bytes) |
| `index_file_count` | BIGINT | Number of index files on disk |
| `index_file_size` | BIGINT | Total size of index files (bytes) |
@@ -855,7 +857,7 @@ The output contains one row per scope:
- `branch:main` — main branch snapshots + tag snapshots
- `branch:<name>` — one row per other branch
-To identify orphan file size:
+To estimate possible orphan file size for recognized data files:
```sql
SELECT p.data_file_size - r.data_file_size AS orphan_data_size