This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b58d25  refactor(datafusion): convert files_size from table functions 
to system tables (#325)
2b58d25 is described below

commit 2b58d254e37af9eb2cca53c4a7a871f92ec0be32
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 11:15:35 2026 +0800

    refactor(datafusion): convert files_size from table functions to system 
tables (#325)
    
    These are table metadata and fit better as system tables 
(table$physical_files_size)
    rather than UDTFs (physical_files_size('table')), consistent with 
$partitions, $snapshots, etc.
---
 crates/integrations/datafusion/src/lib.rs          |   4 -
 .../datafusion/src/system_tables/mod.rs            |   6 +
 .../src/{ => system_tables}/physical_files_size.rs |  74 +---------
 .../{ => system_tables}/referenced_files_size.rs   |  74 +---------
 docs/src/sql.md                                    | 158 ++++++++-------------
 5 files changed, 78 insertions(+), 238 deletions(-)

diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index 1c83017..47f1bab 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -43,10 +43,8 @@ mod filter_pushdown;
 #[cfg(feature = "fulltext")]
 mod full_text_search;
 mod merge_into;
-mod physical_files_size;
 mod physical_plan;
 mod procedures;
-mod referenced_files_size;
 mod relation_planner;
 pub mod runtime;
 mod sql_context;
@@ -69,9 +67,7 @@ pub use catalog::{PaimonCatalogProvider, 
PaimonSchemaProvider};
 pub use error::to_datafusion_error;
 #[cfg(feature = "fulltext")]
 pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
-pub use physical_files_size::{register_physical_files_size, 
PhysicalFilesSizeFunction};
 pub use physical_plan::PaimonTableScan;
-pub use referenced_files_size::{register_referenced_files_size, 
ReferencedFilesSizeFunction};
 pub use relation_planner::PaimonRelationPlanner;
 pub use sql_context::SQLContext;
 pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs 
b/crates/integrations/datafusion/src/system_tables/mod.rs
index ddc10ef..34587f8 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -33,6 +33,8 @@ mod branches;
 mod manifests;
 mod options;
 mod partitions;
+mod physical_files_size;
+mod referenced_files_size;
 mod row_string_cast;
 mod schemas;
 mod snapshots;
@@ -47,6 +49,8 @@ const TABLES: &[(&str, Builder)] = &[
     ("branches", branches::build),
     ("manifests", manifests::build),
     ("options", options::build),
+    ("physical_files_size", physical_files_size::build),
+    ("referenced_files_size", referenced_files_size::build),
     ("schemas", schemas::build),
     ("snapshots", snapshots::build),
     ("tags", tags::build),
@@ -57,6 +61,8 @@ const SYSTEM_TABLE_NAMES: &[&str] = &[
     "manifests",
     "options",
     "partitions",
+    "physical_files_size",
+    "referenced_files_size",
     "schemas",
     "snapshots",
     "tags",
diff --git a/crates/integrations/datafusion/src/physical_files_size.rs 
b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
similarity index 60%
rename from crates/integrations/datafusion/src/physical_files_size.rs
rename to 
crates/integrations/datafusion/src/system_tables/physical_files_size.rs
index 14c3af4..72b3d9e 100644
--- a/crates/integrations/datafusion/src/physical_files_size.rs
+++ b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
@@ -15,89 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Table function that computes the total physical file sizes in the table 
directory.
-//!
-//! Usage: `SELECT * FROM physical_files_size('db.table_name')`
+//! Mirrors Java 
[PhysicalFilesSizeTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system).
 
 use std::any::Any;
-use std::fmt::Debug;
 use std::sync::{Arc, OnceLock};
 
 use async_trait::async_trait;
 use datafusion::arrow::array::{Int64Array, RecordBatch};
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::catalog::Session;
-use datafusion::catalog::TableFunctionImpl;
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result as DFResult;
 use datafusion::logical_expr::Expr;
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use paimon::catalog::Catalog;
 use paimon::table::referenced_files::{collect_physical_files_summary, 
PhysicalFilesSummary};
 use paimon::table::Table;
 
 use crate::error::to_datafusion_error;
-use crate::runtime::{await_with_runtime, block_on_with_runtime};
-use crate::table_function_args::{extract_string_literal, 
parse_table_identifier};
 
-const FUNCTION_NAME: &str = "physical_files_size";
-
-pub fn register_physical_files_size(
-    ctx: &SessionContext,
-    catalog: Arc<dyn Catalog>,
-    default_database: &str,
-) {
-    ctx.register_udtf(
-        FUNCTION_NAME,
-        Arc::new(PhysicalFilesSizeFunction::new(catalog, default_database)),
-    );
-}
-
-pub struct PhysicalFilesSizeFunction {
-    catalog: Arc<dyn Catalog>,
-    default_database: String,
-}
-
-impl Debug for PhysicalFilesSizeFunction {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("PhysicalFilesSizeFunction")
-            .field("default_database", &self.default_database)
-            .finish()
-    }
-}
-
-impl PhysicalFilesSizeFunction {
-    pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
-        Self {
-            catalog,
-            default_database: default_database.to_string(),
-        }
-    }
-}
-
-impl TableFunctionImpl for PhysicalFilesSizeFunction {
-    fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
-        if args.len() != 1 {
-            return Err(datafusion::error::DataFusionError::Plan(
-                "physical_files_size requires 1 argument: 
(table_name)".to_string(),
-            ));
-        }
-
-        let table_name = extract_string_literal(FUNCTION_NAME, &args[0], 
"table_name")?;
-        let identifier =
-            parse_table_identifier(FUNCTION_NAME, &table_name, 
&self.default_database)?;
-
-        let catalog = Arc::clone(&self.catalog);
-        let table = block_on_with_runtime(
-            async move { catalog.get_table(&identifier).await },
-            "physical_files_size: catalog access thread panicked",
-        )
-        .map_err(to_datafusion_error)?;
-
-        Ok(Arc::new(PhysicalFilesSizeTableProvider { table }))
-    }
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+    Ok(Arc::new(PhysicalFilesSizeTable { table }))
 }
 
 fn output_schema() -> SchemaRef {
@@ -117,12 +55,12 @@ fn output_schema() -> SchemaRef {
 }
 
 #[derive(Debug)]
-struct PhysicalFilesSizeTableProvider {
+struct PhysicalFilesSizeTable {
     table: Table,
 }
 
 #[async_trait]
-impl TableProvider for PhysicalFilesSizeTableProvider {
+impl TableProvider for PhysicalFilesSizeTable {
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -143,7 +81,7 @@ impl TableProvider for PhysicalFilesSizeTableProvider {
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
         let table = self.table.clone();
-        let summary = await_with_runtime(async move {
+        let summary = crate::runtime::await_with_runtime(async move {
             collect_physical_files_summary(table.file_io(), 
table.location()).await
         })
         .await
diff --git a/crates/integrations/datafusion/src/referenced_files_size.rs 
b/crates/integrations/datafusion/src/system_tables/referenced_files_size.rs
similarity index 65%
rename from crates/integrations/datafusion/src/referenced_files_size.rs
rename to 
crates/integrations/datafusion/src/system_tables/referenced_files_size.rs
index 387a538..cce5ed3 100644
--- a/crates/integrations/datafusion/src/referenced_files_size.rs
+++ b/crates/integrations/datafusion/src/system_tables/referenced_files_size.rs
@@ -15,89 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Table function that computes per-snapshot referenced file size summaries.
-//!
-//! Usage: `SELECT * FROM referenced_files_size('db.table_name')`
+//! Mirrors Java 
[ReferencedFilesSizeTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system).
 
 use std::any::Any;
-use std::fmt::Debug;
 use std::sync::{Arc, OnceLock};
 
 use async_trait::async_trait;
 use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::catalog::Session;
-use datafusion::catalog::TableFunctionImpl;
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result as DFResult;
 use datafusion::logical_expr::Expr;
 use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use paimon::catalog::Catalog;
 use paimon::table::referenced_files::{collect_referenced_files_summary, 
ReferencedFilesSummary};
 use paimon::table::Table;
 
 use crate::error::to_datafusion_error;
-use crate::runtime::{await_with_runtime, block_on_with_runtime};
-use crate::table_function_args::{extract_string_literal, 
parse_table_identifier};
-
-const FUNCTION_NAME: &str = "referenced_files_size";
-
-pub fn register_referenced_files_size(
-    ctx: &SessionContext,
-    catalog: Arc<dyn Catalog>,
-    default_database: &str,
-) {
-    ctx.register_udtf(
-        FUNCTION_NAME,
-        Arc::new(ReferencedFilesSizeFunction::new(catalog, default_database)),
-    );
-}
-
-pub struct ReferencedFilesSizeFunction {
-    catalog: Arc<dyn Catalog>,
-    default_database: String,
-}
-
-impl Debug for ReferencedFilesSizeFunction {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("ReferencedFilesSizeFunction")
-            .field("default_database", &self.default_database)
-            .finish()
-    }
-}
 
-impl ReferencedFilesSizeFunction {
-    pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
-        Self {
-            catalog,
-            default_database: default_database.to_string(),
-        }
-    }
-}
-
-impl TableFunctionImpl for ReferencedFilesSizeFunction {
-    fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
-        if args.len() != 1 {
-            return Err(datafusion::error::DataFusionError::Plan(
-                "referenced_files_size requires 1 argument: 
(table_name)".to_string(),
-            ));
-        }
-
-        let table_name = extract_string_literal(FUNCTION_NAME, &args[0], 
"table_name")?;
-        let identifier =
-            parse_table_identifier(FUNCTION_NAME, &table_name, 
&self.default_database)?;
-
-        let catalog = Arc::clone(&self.catalog);
-        let table = block_on_with_runtime(
-            async move { catalog.get_table(&identifier).await },
-            "referenced_files_size: catalog access thread panicked",
-        )
-        .map_err(to_datafusion_error)?;
-
-        Ok(Arc::new(ReferencedFilesSizeTableProvider { table }))
-    }
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+    Ok(Arc::new(ReferencedFilesSizeTable { table }))
 }
 
 fn output_schema() -> SchemaRef {
@@ -118,12 +56,12 @@ fn output_schema() -> SchemaRef {
 }
 
 #[derive(Debug)]
-struct ReferencedFilesSizeTableProvider {
+struct ReferencedFilesSizeTable {
     table: Table,
 }
 
 #[async_trait]
-impl TableProvider for ReferencedFilesSizeTableProvider {
+impl TableProvider for ReferencedFilesSizeTable {
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -144,7 +82,7 @@ impl TableProvider for ReferencedFilesSizeTableProvider {
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
         let table = self.table.clone();
-        let summaries = await_with_runtime(async move {
+        let summaries = crate::runtime::await_with_runtime(async move {
             let schema = table.schema();
             let partition_keys = schema.partition_keys();
             let partition_fields = schema.partition_fields();
diff --git a/docs/src/sql.md b/docs/src/sql.md
index bdad090..94eef71 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -537,104 +537,6 @@ SELECT * FROM full_text_search('paimon.my_db.docs', 
'content', 'paimon search',
 
 The function searches across all Tantivy full-text index files for the target 
column, merges results by relevance score, and returns the top-k matching rows. 
If no matching index is found, an empty result is returned.
 
-## Referenced Files Size
-
-The `referenced_files_size` table-valued function computes aggregated 
manifest/data/index file size summaries for all snapshots referenced by a 
table, including snapshots from the main branch, tags, and other branches. This 
is useful for understanding storage usage and for orphan file analysis.
-
-Historical snapshots may be in the process of being cleaned up — if a manifest 
file has already been deleted, it is gracefully skipped (counted as 0 
files/bytes).
-
-### Registration
-
-```rust
-use paimon_datafusion::register_referenced_files_size;
-
-register_referenced_files_size(&ctx, catalog.clone(), "default");
-```
-
-### Usage
-
-```sql
-SELECT * FROM referenced_files_size('table_name')
-```
-
-| Argument | Type | Description |
-|---|---|---|
-| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or 
short form |
-
-Example:
-
-```sql
-SELECT * FROM referenced_files_size('paimon.my_db.orders');
-```
-
-### Output Schema
-
-| Column | Type | Description |
-|---|---|---|
-| `source` | STRING | Scope: `total` or `branch:<name>` |
-| `manifest_file_count` | BIGINT | Number of manifest files |
-| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
-| `data_file_count` | BIGINT | Number of data files |
-| `data_file_size` | BIGINT | Total size of data files (bytes) |
-| `index_file_count` | BIGINT | Number of index files |
-| `index_file_size` | BIGINT | Total size of index files (bytes) |
-
-The output contains one row per scope:
-- `total` — sum across all branches and tags
-- `branch:main` — main branch snapshots + tag snapshots
-- `branch:<name>` — one row per other branch
-
-To get the total referenced size:
-
-```sql
-SELECT manifest_file_size + data_file_size + index_file_size AS total_size
-FROM referenced_files_size('paimon.my_db.orders')
-WHERE source = 'total';
-```
-
-## Physical Files Size
-
-The `physical_files_size` table-valued function scans the table directory 
recursively and computes the total size of all physical files on disk, 
categorized by file type. By comparing with `referenced_files_size`, you can 
identify orphan files that are no longer referenced by any snapshot.
-
-Files are classified by their file name prefix:
-- `manifest-*` / `index-manifest-*` → manifest
-- `index-*` (excluding `index-manifest-*`) → index
-- Everything else → data
-
-### Registration
-
-```rust
-use paimon_datafusion::register_physical_files_size;
-
-register_physical_files_size(&ctx, catalog.clone(), "default");
-```
-
-### Usage
-
-```sql
-SELECT * FROM physical_files_size('table_name')
-```
-
-| Argument | Type | Description |
-|---|---|---|
-| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or 
short form |
-
-Example:
-
-```sql
-SELECT * FROM physical_files_size('paimon.my_db.orders');
-```
-
-### Output Schema
-
-| Column | Type | Description |
-|---|---|---|
-| `manifest_file_count` | BIGINT | Number of manifest files on disk |
-| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
-| `data_file_count` | BIGINT | Number of data files on disk |
-| `data_file_size` | BIGINT | Total size of data files (bytes) |
-| `index_file_count` | BIGINT | Number of index files on disk |
-| `index_file_size` | BIGINT | Total size of index files (bytes) |
 
 ## Time Travel
 
@@ -894,6 +796,66 @@ Columns:
 | `total_buckets` | INT | Total bucket count for the partition (0 unless 
catalog-tracked) |
 | `done` | BOOLEAN | Whether the partition is marked done (false unless 
catalog-tracked) |
 
+### $physical_files_size
+
+Scan the table directory recursively and compute the total size of all 
physical files on disk, categorized by file type. By comparing with 
`$referenced_files_size`, you can identify orphan files that are no longer 
referenced by any snapshot.
+
+Files are classified by their file name prefix:
+- `manifest-*` / `index-manifest-*` → manifest
+- `index-*` (excluding `index-manifest-*`) → index
+- Everything else → data
+
+```sql
+SELECT * FROM paimon.default.my_table$physical_files_size;
+```
+
+Columns:
+
+| Column | Type | Description |
+|---|---|---|
+| `manifest_file_count` | BIGINT | Number of manifest files on disk |
+| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
+| `data_file_count` | BIGINT | Number of data files on disk |
+| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `index_file_count` | BIGINT | Number of index files on disk |
+| `index_file_size` | BIGINT | Total size of index files (bytes) |
+
+### $referenced_files_size
+
+Compute aggregated manifest/data/index file size summaries for all snapshots 
referenced by a table, including snapshots from the main branch, tags, and 
other branches. This is useful for understanding storage usage and for orphan 
file analysis.
+
+Historical snapshots may be in the process of being cleaned up — if a manifest 
file has already been deleted, it is gracefully skipped (counted as 0 
files/bytes).
+
+```sql
+SELECT * FROM paimon.default.my_table$referenced_files_size;
+```
+
+Columns:
+
+| Column | Type | Description |
+|---|---|---|
+| `source` | STRING | Scope: `total` or `branch:<name>` |
+| `manifest_file_count` | BIGINT | Number of manifest files |
+| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
+| `data_file_count` | BIGINT | Number of data files |
+| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `index_file_count` | BIGINT | Number of index files |
+| `index_file_size` | BIGINT | Total size of index files (bytes) |
+
+The output contains one row per scope:
+- `total` — sum across all branches and tags
+- `branch:main` — main branch snapshots + tag snapshots
+- `branch:<name>` — one row per other branch
+
+To identify orphan file size:
+
+```sql
+SELECT p.data_file_size - r.data_file_size AS orphan_data_size
+FROM paimon.default.my_table$physical_files_size p,
+     paimon.default.my_table$referenced_files_size r
+WHERE r.source = 'total';
+```
+
 ### Branch References
 
 System tables support branch syntax:

Reply via email to