This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new c954893  fix: classify physical files by path context (#341)
c954893 is described below

commit c954893b875b921db2b4a4e46047341fb464ab45
Author: Jiwen liu <[email protected]>
AuthorDate: Sun May 24 20:57:45 2026 +0800

    fix: classify physical files by path context (#341)
---
 .../src/system_tables/physical_files_size.rs       |   3 +-
 crates/paimon/src/table/referenced_files.rs        | 302 +++++++++++++++++++--
 docs/src/sql.md                                    |  18 +-
 3 files changed, 286 insertions(+), 37 deletions(-)

diff --git 
a/crates/integrations/datafusion/src/system_tables/physical_files_size.rs 
b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
index 72b3d9e..5380612 100644
--- a/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
+++ b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
@@ -82,7 +82,8 @@ impl TableProvider for PhysicalFilesSizeTable {
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
         let table = self.table.clone();
         let summary = crate::runtime::await_with_runtime(async move {
-            collect_physical_files_summary(table.file_io(), 
table.location()).await
+            let partition_depth = table.schema().partition_keys().len();
+            collect_physical_files_summary(table.file_io(), table.location(), 
partition_depth).await
         })
         .await
         .map_err(to_datafusion_error)?;
diff --git a/crates/paimon/src/table/referenced_files.rs 
b/crates/paimon/src/table/referenced_files.rs
index 2c41c67..a170b3c 100644
--- a/crates/paimon/src/table/referenced_files.rs
+++ b/crates/paimon/src/table/referenced_files.rs
@@ -614,25 +614,113 @@ pub struct PhysicalFilesSummary {
     pub index_file_size: i64,
 }
 
-/// Categorize a file name into a file type.
-/// Everything that is not a manifest/statistics or index file is classified 
as data.
-fn classify_file_name(file_name: &str) -> FileType {
-    if file_name.starts_with("manifest-")
-        || file_name.starts_with("index-manifest-")
-        || file_name.starts_with("statistics-")
-    {
-        FileType::Manifest
-    } else if file_name.starts_with("index-") {
-        FileType::Index
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum PhysicalFileKind {
+    Manifest,
+    Statistics,
+    Data,
+    Index,
+    Other,
+}
+
+fn table_relative_path<'a>(table_location: &str, path: &'a str) -> Option<&'a 
str> {
+    let table_location = table_location.trim_end_matches('/');
+    if path == table_location {
+        Some("")
     } else {
-        FileType::Data
+        path.strip_prefix(table_location)
+            .and_then(|rest| rest.strip_prefix('/'))
     }
 }
 
-enum FileType {
-    Manifest,
-    Data,
-    Index,
+fn is_manifest_file_name(file_name: &str) -> bool {
+    file_name.starts_with("manifest-")
+        || file_name.starts_with("manifest-list-")
+        || file_name.starts_with("index-manifest-")
+}
+
+fn is_bucket_dir_name(segment: &str) -> bool {
+    segment == "bucket-postpone"
+        || segment
+            .strip_prefix("bucket-")
+            .is_some_and(|bucket| !bucket.is_empty() && bucket.chars().all(|c| 
c.is_ascii_digit()))
+}
+
+fn is_partition_segment(segment: &str) -> bool {
+    let Some((key, _value)) = segment.split_once('=') else {
+        return false;
+    };
+    !key.is_empty()
+}
+
+fn is_data_file_in_bucket(segments: &[&str], partition_depth: usize) -> bool {
+    if segments.len() != partition_depth + 2 {
+        return false;
+    }
+
+    segments[..partition_depth]
+        .iter()
+        .all(|segment| is_partition_segment(segment))
+        && is_bucket_dir_name(segments[partition_depth])
+        && !segments[partition_depth + 1].starts_with("index-")
+}
+
+fn is_data_file_in_data_dir(
+    relative_path: &str,
+    data_dir_relative_path: &str,
+    partition_depth: usize,
+) -> bool {
+    let data_dir_relative_path = data_dir_relative_path.trim_matches('/');
+    let data_relative_path = if data_dir_relative_path.is_empty() {
+        relative_path
+    } else {
+        let Some(rest) = relative_path
+            .strip_prefix(data_dir_relative_path)
+            .and_then(|rest| rest.strip_prefix('/'))
+        else {
+            return false;
+        };
+        rest
+    };
+    let segments = data_relative_path.split('/').collect::<Vec<_>>();
+    is_data_file_in_bucket(&segments, partition_depth)
+}
+
+fn classify_physical_path(
+    table_location: &str,
+    path: &str,
+    partition_depth: usize,
+    data_file_path_directory: Option<&str>,
+) -> PhysicalFileKind {
+    let Some(relative_path) = table_relative_path(table_location, path) else {
+        return PhysicalFileKind::Other;
+    };
+    let relative_path = relative_path.trim_matches('/');
+    if relative_path.is_empty() {
+        return PhysicalFileKind::Other;
+    }
+
+    let segments = relative_path.split('/').collect::<Vec<_>>();
+
+    match segments.as_slice() {
+        ["manifest", name] if is_manifest_file_name(name) => 
PhysicalFileKind::Manifest,
+        ["statistics", _] => PhysicalFileKind::Statistics,
+        ["index", _] => PhysicalFileKind::Index,
+        _ => {
+            if let Some(data_dir) = data_file_path_directory {
+                let data_dir = table_relative_path(table_location, 
data_dir).unwrap_or(data_dir);
+                if is_data_file_in_data_dir(relative_path, data_dir, 
partition_depth) {
+                    PhysicalFileKind::Data
+                } else {
+                    PhysicalFileKind::Other
+                }
+            } else if is_data_file_in_bucket(&segments, partition_depth) {
+                PhysicalFileKind::Data
+            } else {
+                PhysicalFileKind::Other
+            }
+        }
+    }
 }
 
 const DIR_LIST_CONCURRENCY: usize = 32;
@@ -643,13 +731,13 @@ const DIR_LIST_CONCURRENCY: usize = 32;
 /// subdirectory recursively (up to 32 in parallel) to maximize throughput
 /// on object stores with many partition directories.
 ///
-/// Files are classified by their file name prefix:
-/// - `manifest-*` / `index-manifest-*` → manifest
-/// - `index-*` (excluding `index-manifest-*`) → index
-/// - Everything else → data
+/// Files are classified by their table-relative path. Only recognized Paimon
+/// metadata directories and partition/bucket data paths are counted; unknown
+/// files are ignored by this summary.
 pub async fn collect_physical_files_summary(
     file_io: &FileIO,
     table_location: &str,
+    partition_depth: usize,
 ) -> crate::Result<PhysicalFilesSummary> {
     // List top-level entries to discover subdirectories and top-level files
     let top_entries = match file_io.list_status(table_location).await {
@@ -670,8 +758,13 @@ pub async fn collect_physical_files_summary(
         if entry.is_dir {
             sub_dirs.push(entry.path.clone());
         } else {
-            let file_name = 
entry.path.rsplit('/').next().unwrap_or(&entry.path);
-            accumulate_file(&mut summary, file_name, entry.size);
+            accumulate_file(
+                &mut summary,
+                table_location,
+                &entry.path,
+                partition_depth,
+                entry.size,
+            );
         }
     }
 
@@ -695,28 +788,40 @@ pub async fn collect_physical_files_summary(
     for result in dir_results {
         let statuses = result?;
         for status in &statuses {
-            let file_name = 
status.path.rsplit('/').next().unwrap_or(&status.path);
-            accumulate_file(&mut summary, file_name, status.size);
+            accumulate_file(
+                &mut summary,
+                table_location,
+                &status.path,
+                partition_depth,
+                status.size,
+            );
         }
     }
 
     Ok(summary)
 }
 
-fn accumulate_file(summary: &mut PhysicalFilesSummary, file_name: &str, size: 
u64) {
-    match classify_file_name(file_name) {
-        FileType::Manifest => {
+fn accumulate_file(
+    summary: &mut PhysicalFilesSummary,
+    table_location: &str,
+    path: &str,
+    partition_depth: usize,
+    size: u64,
+) {
+    match classify_physical_path(table_location, path, partition_depth, None) {
+        PhysicalFileKind::Manifest | PhysicalFileKind::Statistics => {
             summary.manifest_file_count += 1;
             summary.manifest_file_size += size as i64;
         }
-        FileType::Data => {
+        PhysicalFileKind::Data => {
             summary.data_file_count += 1;
             summary.data_file_size += size as i64;
         }
-        FileType::Index => {
+        PhysicalFileKind::Index => {
             summary.index_file_count += 1;
             summary.index_file_size += size as i64;
         }
+        PhysicalFileKind::Other => {}
     }
 }
 
@@ -726,11 +831,21 @@ mod tests {
     use crate::io::FileIOBuilder;
     use crate::spec::{CommitKind, Snapshot};
     use crate::table::{BranchManager, SnapshotManager, TagManager};
+    use bytes::Bytes;
 
     fn test_file_io() -> FileIO {
         FileIOBuilder::new("memory").build().unwrap()
     }
 
+    async fn write_test_file(file_io: &FileIO, path: &str, content: &str) {
+        file_io
+            .new_output(path)
+            .unwrap()
+            .write(Bytes::from(content.to_string()))
+            .await
+            .unwrap();
+    }
+
     #[tokio::test]
     async fn test_collect_empty_table() {
         let file_io = test_file_io();
@@ -788,6 +903,137 @@ mod tests {
         assert_eq!(result[1].data_file_count, 0);
     }
 
+    #[tokio::test]
+    async fn 
test_physical_files_summary_uses_path_context_for_unpartitioned_table() {
+        let table_path = "memory:/test_physical_files_summary_path_context";
+        let file_io = test_file_io();
+
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/manifest/manifest-list-0"),
+            "manifest list",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/manifest/manifest-0"),
+            "manifest",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/manifest/index-manifest-0"),
+            "index manifest",
+        )
+        .await;
+        write_test_file(&file_io, &format!("{table_path}/index/index-0"), 
"index").await;
+        write_test_file(&file_io, &format!("{table_path}/bucket-0/data-0"), 
"data").await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/bucket-0/part-0.parquet"),
+            "data without prefix",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/bucket-0/index-should-not-be-data"),
+            "bucket index",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/bucket-postpone/data-u-0"),
+            "postpone data",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/nested/bucket-0/data-too-deep"),
+            "not a data bucket",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/data-root-file.parquet"),
+            "root data prefix",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/snapshot/snapshot-1"),
+            "snapshot",
+        )
+        .await;
+        write_test_file(&file_io, &format!("{table_path}/schema/schema-0"), 
"schema").await;
+        write_test_file(&file_io, &format!("{table_path}/tag/tag-v1"), 
"tag").await;
+        write_test_file(&file_io, &format!("{table_path}/_SUCCESS"), 
"success").await;
+        write_test_file(&file_io, &format!("{table_path}/random-file"), 
"random").await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/statistics/stat-0"),
+            "statistics",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/manifest/stat-0"),
+            "not classified by statistics prefix",
+        )
+        .await;
+
+        let result = collect_physical_files_summary(&file_io, table_path, 0)
+            .await
+            .unwrap();
+
+        assert_eq!(result.manifest_file_count, 4);
+        assert_eq!(result.index_file_count, 1);
+        assert_eq!(result.data_file_count, 3);
+    }
+
+    #[tokio::test]
+    async fn test_physical_files_summary_uses_partition_depth() {
+        let table_path = "memory:/test_physical_files_summary_partitioned";
+        let file_io = test_file_io();
+
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/dt=2026-05-21/bucket-0/part-0.parquet"),
+            "partition data",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/dt=2026-05-21/bucket-0/index-0"),
+            "partition bucket index",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/dt=2026-05-21/not-bucket/data-0"),
+            "not bucket",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/not_partition/bucket-0/data-0"),
+            "not partition",
+        )
+        .await;
+        write_test_file(
+            &file_io,
+            &format!("{table_path}/bucket-0/root-bucket-file"),
+            "wrong depth",
+        )
+        .await;
+
+        let result = collect_physical_files_summary(&file_io, table_path, 1)
+            .await
+            .unwrap();
+
+        assert_eq!(result.data_file_count, 1);
+        assert_eq!(result.index_file_count, 0);
+    }
+
     #[tokio::test]
     async fn test_branch_tag_referenced_files() {
         use crate::spec::stats::BinaryTableStats;
diff --git a/docs/src/sql.md b/docs/src/sql.md
index 325d236..4a8dc4f 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -806,12 +806,14 @@ Columns:
 
 ### $physical_files_size
 
-Scan the table directory recursively and compute the total size of all 
physical files on disk, categorized by file type. By comparing with 
`$referenced_files_size`, you can identify orphan files that are no longer 
referenced by any snapshot.
+Scan the table directory recursively and compute the total size of recognized 
physical files on disk, categorized by file type. This table is a diagnostic 
size summary; orphan cleanup needs file-level candidates and retention checks, 
not just aggregate size differences.
 
-Files are classified by their file name prefix:
-- `manifest-*` / `index-manifest-*` → manifest
-- `index-*` (excluding `index-manifest-*`) → index
-- Everything else → data
+Files are classified by their table-relative path:
+- `manifest/manifest-*`, `manifest/manifest-list-*`, and 
`manifest/index-manifest-*` → manifest
+- `statistics/*` → manifest file counters for the current compatible output 
schema
+- `index/*` → index
+- `<partition>/bucket-*/*` and `<partition>/bucket-postpone/*` → data, using 
the table's partition depth
+- unknown files are ignored by this summary
 
 ```sql
 SELECT * FROM paimon.default.my_table$physical_files_size;
@@ -823,8 +825,8 @@ Columns:
 |---|---|---|
 | `manifest_file_count` | BIGINT | Number of manifest files on disk |
 | `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
-| `data_file_count` | BIGINT | Number of data files on disk |
-| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `data_file_count` | BIGINT | Number of recognized data files on disk |
+| `data_file_size` | BIGINT | Total size of recognized data files (bytes) |
 | `index_file_count` | BIGINT | Number of index files on disk |
 | `index_file_size` | BIGINT | Total size of index files (bytes) |
 
@@ -855,7 +857,7 @@ The output contains one row per scope:
 - `branch:main` — main branch snapshots + tag snapshots
 - `branch:<name>` — one row per other branch
 
-To identify orphan file size:
+To estimate possible orphan file size for recognized data files:
 
 ```sql
 SELECT p.data_file_size - r.data_file_size AS orphan_data_size

Reply via email to