This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2b58d25 refactor(datafusion): convert files_size from table functions
to system tables (#325)
2b58d25 is described below
commit 2b58d254e37af9eb2cca53c4a7a871f92ec0be32
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue May 19 11:15:35 2026 +0800
refactor(datafusion): convert files_size from table functions to system
tables (#325)
These are table metadata and fit better as system tables
(table$physical_files_size)
rather than UDTFs (physical_files_size('table')), consistent with
$partitions, $snapshots, etc.
---
crates/integrations/datafusion/src/lib.rs | 4 -
.../datafusion/src/system_tables/mod.rs | 6 +
.../src/{ => system_tables}/physical_files_size.rs | 74 +---------
.../{ => system_tables}/referenced_files_size.rs | 74 +---------
docs/src/sql.md | 158 ++++++++-------------
5 files changed, 78 insertions(+), 238 deletions(-)
diff --git a/crates/integrations/datafusion/src/lib.rs
b/crates/integrations/datafusion/src/lib.rs
index 1c83017..47f1bab 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -43,10 +43,8 @@ mod filter_pushdown;
#[cfg(feature = "fulltext")]
mod full_text_search;
mod merge_into;
-mod physical_files_size;
mod physical_plan;
mod procedures;
-mod referenced_files_size;
mod relation_planner;
pub mod runtime;
mod sql_context;
@@ -69,9 +67,7 @@ pub use catalog::{PaimonCatalogProvider,
PaimonSchemaProvider};
pub use error::to_datafusion_error;
#[cfg(feature = "fulltext")]
pub use full_text_search::{register_full_text_search, FullTextSearchFunction};
-pub use physical_files_size::{register_physical_files_size,
PhysicalFilesSizeFunction};
pub use physical_plan::PaimonTableScan;
-pub use referenced_files_size::{register_referenced_files_size,
ReferencedFilesSizeFunction};
pub use relation_planner::PaimonRelationPlanner;
pub use sql_context::SQLContext;
pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
index ddc10ef..34587f8 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -33,6 +33,8 @@ mod branches;
mod manifests;
mod options;
mod partitions;
+mod physical_files_size;
+mod referenced_files_size;
mod row_string_cast;
mod schemas;
mod snapshots;
@@ -47,6 +49,8 @@ const TABLES: &[(&str, Builder)] = &[
("branches", branches::build),
("manifests", manifests::build),
("options", options::build),
+ ("physical_files_size", physical_files_size::build),
+ ("referenced_files_size", referenced_files_size::build),
("schemas", schemas::build),
("snapshots", snapshots::build),
("tags", tags::build),
@@ -57,6 +61,8 @@ const SYSTEM_TABLE_NAMES: &[&str] = &[
"manifests",
"options",
"partitions",
+ "physical_files_size",
+ "referenced_files_size",
"schemas",
"snapshots",
"tags",
diff --git a/crates/integrations/datafusion/src/physical_files_size.rs
b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
similarity index 60%
rename from crates/integrations/datafusion/src/physical_files_size.rs
rename to
crates/integrations/datafusion/src/system_tables/physical_files_size.rs
index 14c3af4..72b3d9e 100644
--- a/crates/integrations/datafusion/src/physical_files_size.rs
+++ b/crates/integrations/datafusion/src/system_tables/physical_files_size.rs
@@ -15,89 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-//! Table function that computes the total physical file sizes in the table
directory.
-//!
-//! Usage: `SELECT * FROM physical_files_size('db.table_name')`
+//! Mirrors Java
[PhysicalFilesSizeTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system).
use std::any::Any;
-use std::fmt::Debug;
use std::sync::{Arc, OnceLock};
use async_trait::async_trait;
use datafusion::arrow::array::{Int64Array, RecordBatch};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::catalog::Session;
-use datafusion::catalog::TableFunctionImpl;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use paimon::catalog::Catalog;
use paimon::table::referenced_files::{collect_physical_files_summary,
PhysicalFilesSummary};
use paimon::table::Table;
use crate::error::to_datafusion_error;
-use crate::runtime::{await_with_runtime, block_on_with_runtime};
-use crate::table_function_args::{extract_string_literal,
parse_table_identifier};
-const FUNCTION_NAME: &str = "physical_files_size";
-
-pub fn register_physical_files_size(
- ctx: &SessionContext,
- catalog: Arc<dyn Catalog>,
- default_database: &str,
-) {
- ctx.register_udtf(
- FUNCTION_NAME,
- Arc::new(PhysicalFilesSizeFunction::new(catalog, default_database)),
- );
-}
-
-pub struct PhysicalFilesSizeFunction {
- catalog: Arc<dyn Catalog>,
- default_database: String,
-}
-
-impl Debug for PhysicalFilesSizeFunction {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("PhysicalFilesSizeFunction")
- .field("default_database", &self.default_database)
- .finish()
- }
-}
-
-impl PhysicalFilesSizeFunction {
- pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
- Self {
- catalog,
- default_database: default_database.to_string(),
- }
- }
-}
-
-impl TableFunctionImpl for PhysicalFilesSizeFunction {
- fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
- if args.len() != 1 {
- return Err(datafusion::error::DataFusionError::Plan(
- "physical_files_size requires 1 argument:
(table_name)".to_string(),
- ));
- }
-
- let table_name = extract_string_literal(FUNCTION_NAME, &args[0],
"table_name")?;
- let identifier =
- parse_table_identifier(FUNCTION_NAME, &table_name,
&self.default_database)?;
-
- let catalog = Arc::clone(&self.catalog);
- let table = block_on_with_runtime(
- async move { catalog.get_table(&identifier).await },
- "physical_files_size: catalog access thread panicked",
- )
- .map_err(to_datafusion_error)?;
-
- Ok(Arc::new(PhysicalFilesSizeTableProvider { table }))
- }
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+ Ok(Arc::new(PhysicalFilesSizeTable { table }))
}
fn output_schema() -> SchemaRef {
@@ -117,12 +55,12 @@ fn output_schema() -> SchemaRef {
}
#[derive(Debug)]
-struct PhysicalFilesSizeTableProvider {
+struct PhysicalFilesSizeTable {
table: Table,
}
#[async_trait]
-impl TableProvider for PhysicalFilesSizeTableProvider {
+impl TableProvider for PhysicalFilesSizeTable {
fn as_any(&self) -> &dyn Any {
self
}
@@ -143,7 +81,7 @@ impl TableProvider for PhysicalFilesSizeTableProvider {
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let table = self.table.clone();
- let summary = await_with_runtime(async move {
+ let summary = crate::runtime::await_with_runtime(async move {
collect_physical_files_summary(table.file_io(),
table.location()).await
})
.await
diff --git a/crates/integrations/datafusion/src/referenced_files_size.rs
b/crates/integrations/datafusion/src/system_tables/referenced_files_size.rs
similarity index 65%
rename from crates/integrations/datafusion/src/referenced_files_size.rs
rename to
crates/integrations/datafusion/src/system_tables/referenced_files_size.rs
index 387a538..cce5ed3 100644
--- a/crates/integrations/datafusion/src/referenced_files_size.rs
+++ b/crates/integrations/datafusion/src/system_tables/referenced_files_size.rs
@@ -15,89 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-//! Table function that computes per-snapshot referenced file size summaries.
-//!
-//! Usage: `SELECT * FROM referenced_files_size('db.table_name')`
+//! Mirrors Java
[ReferencedFilesSizeTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system).
use std::any::Any;
-use std::fmt::Debug;
use std::sync::{Arc, OnceLock};
use async_trait::async_trait;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::catalog::Session;
-use datafusion::catalog::TableFunctionImpl;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
-use datafusion::prelude::SessionContext;
-use paimon::catalog::Catalog;
use paimon::table::referenced_files::{collect_referenced_files_summary,
ReferencedFilesSummary};
use paimon::table::Table;
use crate::error::to_datafusion_error;
-use crate::runtime::{await_with_runtime, block_on_with_runtime};
-use crate::table_function_args::{extract_string_literal,
parse_table_identifier};
-
-const FUNCTION_NAME: &str = "referenced_files_size";
-
-pub fn register_referenced_files_size(
- ctx: &SessionContext,
- catalog: Arc<dyn Catalog>,
- default_database: &str,
-) {
- ctx.register_udtf(
- FUNCTION_NAME,
- Arc::new(ReferencedFilesSizeFunction::new(catalog, default_database)),
- );
-}
-
-pub struct ReferencedFilesSizeFunction {
- catalog: Arc<dyn Catalog>,
- default_database: String,
-}
-
-impl Debug for ReferencedFilesSizeFunction {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("ReferencedFilesSizeFunction")
- .field("default_database", &self.default_database)
- .finish()
- }
-}
-impl ReferencedFilesSizeFunction {
- pub fn new(catalog: Arc<dyn Catalog>, default_database: &str) -> Self {
- Self {
- catalog,
- default_database: default_database.to_string(),
- }
- }
-}
-
-impl TableFunctionImpl for ReferencedFilesSizeFunction {
- fn call(&self, args: &[Expr]) -> DFResult<Arc<dyn TableProvider>> {
- if args.len() != 1 {
- return Err(datafusion::error::DataFusionError::Plan(
- "referenced_files_size requires 1 argument:
(table_name)".to_string(),
- ));
- }
-
- let table_name = extract_string_literal(FUNCTION_NAME, &args[0],
"table_name")?;
- let identifier =
- parse_table_identifier(FUNCTION_NAME, &table_name,
&self.default_database)?;
-
- let catalog = Arc::clone(&self.catalog);
- let table = block_on_with_runtime(
- async move { catalog.get_table(&identifier).await },
- "referenced_files_size: catalog access thread panicked",
- )
- .map_err(to_datafusion_error)?;
-
- Ok(Arc::new(ReferencedFilesSizeTableProvider { table }))
- }
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+ Ok(Arc::new(ReferencedFilesSizeTable { table }))
}
fn output_schema() -> SchemaRef {
@@ -118,12 +56,12 @@ fn output_schema() -> SchemaRef {
}
#[derive(Debug)]
-struct ReferencedFilesSizeTableProvider {
+struct ReferencedFilesSizeTable {
table: Table,
}
#[async_trait]
-impl TableProvider for ReferencedFilesSizeTableProvider {
+impl TableProvider for ReferencedFilesSizeTable {
fn as_any(&self) -> &dyn Any {
self
}
@@ -144,7 +82,7 @@ impl TableProvider for ReferencedFilesSizeTableProvider {
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let table = self.table.clone();
- let summaries = await_with_runtime(async move {
+ let summaries = crate::runtime::await_with_runtime(async move {
let schema = table.schema();
let partition_keys = schema.partition_keys();
let partition_fields = schema.partition_fields();
diff --git a/docs/src/sql.md b/docs/src/sql.md
index bdad090..94eef71 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -537,104 +537,6 @@ SELECT * FROM full_text_search('paimon.my_db.docs',
'content', 'paimon search',
The function searches across all Tantivy full-text index files for the target
column, merges results by relevance score, and returns the top-k matching rows.
If no matching index is found, an empty result is returned.
-## Referenced Files Size
-
-The `referenced_files_size` table-valued function computes aggregated
manifest/data/index file size summaries for all snapshots referenced by a
table, including snapshots from the main branch, tags, and other branches. This
is useful for understanding storage usage and for orphan file analysis.
-
-Historical snapshots may be in the process of being cleaned up — if a manifest
file has already been deleted, it is gracefully skipped (counted as 0
files/bytes).
-
-### Registration
-
-```rust
-use paimon_datafusion::register_referenced_files_size;
-
-register_referenced_files_size(&ctx, catalog.clone(), "default");
-```
-
-### Usage
-
-```sql
-SELECT * FROM referenced_files_size('table_name')
-```
-
-| Argument | Type | Description |
-|---|---|---|
-| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or
short form |
-
-Example:
-
-```sql
-SELECT * FROM referenced_files_size('paimon.my_db.orders');
-```
-
-### Output Schema
-
-| Column | Type | Description |
-|---|---|---|
-| `source` | STRING | Scope: `total` or `branch:<name>` |
-| `manifest_file_count` | BIGINT | Number of manifest files |
-| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
-| `data_file_count` | BIGINT | Number of data files |
-| `data_file_size` | BIGINT | Total size of data files (bytes) |
-| `index_file_count` | BIGINT | Number of index files |
-| `index_file_size` | BIGINT | Total size of index files (bytes) |
-
-The output contains one row per scope:
-- `total` — sum across all branches and tags
-- `branch:main` — main branch snapshots + tag snapshots
-- `branch:<name>` — one row per other branch
-
-To get the total referenced size:
-
-```sql
-SELECT manifest_file_size + data_file_size + index_file_size AS total_size
-FROM referenced_files_size('paimon.my_db.orders')
-WHERE source = 'total';
-```
-
-## Physical Files Size
-
-The `physical_files_size` table-valued function scans the table directory
recursively and computes the total size of all physical files on disk,
categorized by file type. By comparing with `referenced_files_size`, you can
identify orphan files that are no longer referenced by any snapshot.
-
-Files are classified by their file name prefix:
-- `manifest-*` / `index-manifest-*` → manifest
-- `index-*` (excluding `index-manifest-*`) → index
-- Everything else → data
-
-### Registration
-
-```rust
-use paimon_datafusion::register_physical_files_size;
-
-register_physical_files_size(&ctx, catalog.clone(), "default");
-```
-
-### Usage
-
-```sql
-SELECT * FROM physical_files_size('table_name')
-```
-
-| Argument | Type | Description |
-|---|---|---|
-| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or
short form |
-
-Example:
-
-```sql
-SELECT * FROM physical_files_size('paimon.my_db.orders');
-```
-
-### Output Schema
-
-| Column | Type | Description |
-|---|---|---|
-| `manifest_file_count` | BIGINT | Number of manifest files on disk |
-| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
-| `data_file_count` | BIGINT | Number of data files on disk |
-| `data_file_size` | BIGINT | Total size of data files (bytes) |
-| `index_file_count` | BIGINT | Number of index files on disk |
-| `index_file_size` | BIGINT | Total size of index files (bytes) |
## Time Travel
@@ -894,6 +796,66 @@ Columns:
| `total_buckets` | INT | Total bucket count for the partition (0 unless
catalog-tracked) |
| `done` | BOOLEAN | Whether the partition is marked done (false unless
catalog-tracked) |
+### $physical_files_size
+
+Scan the table directory recursively and compute the total size of all
physical files on disk, categorized by file type. By comparing with
`$referenced_files_size`, you can identify orphan files that are no longer
referenced by any snapshot.
+
+Files are classified by their file name prefix:
+- `manifest-*` / `index-manifest-*` → manifest
+- `index-*` (excluding `index-manifest-*`) → index
+- Everything else → data
+
+```sql
+SELECT * FROM paimon.default.my_table$physical_files_size;
+```
+
+Columns:
+
+| Column | Type | Description |
+|---|---|---|
+| `manifest_file_count` | BIGINT | Number of manifest files on disk |
+| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
+| `data_file_count` | BIGINT | Number of data files on disk |
+| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `index_file_count` | BIGINT | Number of index files on disk |
+| `index_file_size` | BIGINT | Total size of index files (bytes) |
+
+### $referenced_files_size
+
+Compute aggregated manifest/data/index file size summaries for all snapshots
referenced by a table, including snapshots from the main branch, tags, and
other branches. This is useful for understanding storage usage and for orphan
file analysis.
+
+Historical snapshots may be in the process of being cleaned up — if a manifest
file has already been deleted, it is gracefully skipped (counted as 0
files/bytes).
+
+```sql
+SELECT * FROM paimon.default.my_table$referenced_files_size;
+```
+
+Columns:
+
+| Column | Type | Description |
+|---|---|---|
+| `source` | STRING | Scope: `total` or `branch:<name>` |
+| `manifest_file_count` | BIGINT | Number of manifest files |
+| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) |
+| `data_file_count` | BIGINT | Number of data files |
+| `data_file_size` | BIGINT | Total size of data files (bytes) |
+| `index_file_count` | BIGINT | Number of index files |
+| `index_file_size` | BIGINT | Total size of index files (bytes) |
+
+The output contains one row per scope:
+- `total` — sum across all branches and tags
+- `branch:main` — main branch snapshots + tag snapshots
+- `branch:<name>` — one row per other branch
+
+To identify orphan file size:
+
+```sql
+SELECT p.data_file_size - r.data_file_size AS orphan_data_size
+FROM paimon.default.my_table$physical_files_size p,
+ paimon.default.my_table$referenced_files_size r
+WHERE r.source = 'total';
+```
+
### Branch References
System tables support branch syntax: