This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 08db7c6 feat(datafusion): Add $files system table (#328)
08db7c6 is described below
commit 08db7c6f97865ed123030e04104f6c7f3d6b6831
Author: jerry <[email protected]>
AuthorDate: Tue May 19 18:03:39 2026 +0800
feat(datafusion): Add $files system table (#328)
---
.../datafusion/src/system_tables/files.rs | 573 +++++++++++++++++++++
.../datafusion/src/system_tables/mod.rs | 6 +
.../src/system_tables/row_string_cast.rs | 55 ++
.../integrations/datafusion/tests/system_tables.rs | 93 ++++
4 files changed, 727 insertions(+)
diff --git a/crates/integrations/datafusion/src/system_tables/files.rs
b/crates/integrations/datafusion/src/system_tables/files.rs
new file mode 100644
index 0000000..670b887
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/files.rs
@@ -0,0 +1,573 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Mirrors Java
[FilesTable](https://github.com/apache/paimon/blob/release-1.4/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java).
+
+use std::any::Any;
+use std::collections::{BTreeMap, HashMap};
+use std::sync::{Arc, OnceLock};
+
+use async_trait::async_trait;
+use datafusion::arrow::array::{
+ Int32Array, Int64Array, ListBuilder, RecordBatch, StringArray,
StringBuilder,
+ TimestampMillisecondArray,
+};
+use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema,
SchemaRef, TimeUnit};
+use datafusion::catalog::Session;
+use datafusion::datasource::memory::MemorySourceConfig;
+use datafusion::datasource::{TableProvider, TableType};
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::Expr;
+use datafusion::physical_plan::ExecutionPlan;
+use paimon::spec::{BinaryRow, DataField, DataFileMeta, TableSchema};
+use paimon::table::{DataSplit, Table};
+
+use super::row_string_cast::{
+ format_row_as_java_array_string, format_row_as_java_cast_string,
+ format_row_field_as_java_cast_string,
+};
+use crate::error::to_datafusion_error;
+
+pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
+ Ok(Arc::new(FilesTable { table }))
+}
+
+fn files_schema() -> SchemaRef {
+ static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
+ SCHEMA
+ .get_or_init(|| {
+ Arc::new(Schema::new(vec![
+ Field::new("partition", ArrowDataType::Utf8, true),
+ Field::new("bucket", ArrowDataType::Int32, false),
+ Field::new("file_path", ArrowDataType::Utf8, false),
+ Field::new("file_format", ArrowDataType::Utf8, false),
+ Field::new("schema_id", ArrowDataType::Int64, false),
+ Field::new("level", ArrowDataType::Int32, false),
+ Field::new("record_count", ArrowDataType::Int64, false),
+ Field::new("file_size_in_bytes", ArrowDataType::Int64, false),
+ Field::new("min_key", ArrowDataType::Utf8, true),
+ Field::new("max_key", ArrowDataType::Utf8, true),
+ Field::new("null_value_counts", ArrowDataType::Utf8, false),
+ Field::new("min_value_stats", ArrowDataType::Utf8, false),
+ Field::new("max_value_stats", ArrowDataType::Utf8, false),
+ Field::new("min_sequence_number", ArrowDataType::Int64, true),
+ Field::new("max_sequence_number", ArrowDataType::Int64, true),
+ Field::new(
+ "creation_time",
+ ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
+ true,
+ ),
+ Field::new("delete_row_count", ArrowDataType::Int64, true),
+ Field::new("file_source", ArrowDataType::Utf8, true),
+ Field::new("first_row_id", ArrowDataType::Int64, true),
+ Field::new(
+ "write_cols",
+ ArrowDataType::List(Arc::new(Field::new("item",
ArrowDataType::Utf8, true))),
+ true,
+ ),
+ ]))
+ })
+ .clone()
+}
+
+#[derive(Debug)]
+struct FilesTable {
+ table: Table,
+}
+
+#[async_trait]
+impl TableProvider for FilesTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ files_schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> DFResult<Arc<dyn ExecutionPlan>> {
+ let table = self.table.clone();
+ let rows =
+ crate::runtime::await_with_runtime(async move {
collect_file_rows(&table).await })
+ .await
+ .map_err(to_datafusion_error)?;
+
+ let schema = files_schema();
+ let batch = file_rows_to_record_batch(&rows)?;
+
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![batch]],
+ schema,
+ projection.cloned(),
+ )?)
+ }
+}
+
+#[derive(Debug)]
+struct FileRow {
+ partition: Option<String>,
+ bucket: i32,
+ file_path: String,
+ file_format: String,
+ schema_id: i64,
+ level: i32,
+ record_count: i64,
+ file_size_in_bytes: i64,
+ min_key: Option<String>,
+ max_key: Option<String>,
+ null_value_counts: String,
+ min_value_stats: String,
+ max_value_stats: String,
+ min_sequence_number: Option<i64>,
+ max_sequence_number: Option<i64>,
+ creation_time: Option<i64>,
+ delete_row_count: Option<i64>,
+ file_source: Option<String>,
+ first_row_id: Option<i64>,
+ write_cols: Option<Vec<String>>,
+}
+
+async fn collect_file_rows(table: &Table) -> paimon::Result<Vec<FileRow>> {
+ let scan = table
+ .new_read_builder()
+ .new_scan()
+ .with_scan_all_files()
+ .plan()
+ .await?;
+ let partition_fields = table.schema().partition_fields();
+ let mut schema_cache = HashMap::new();
+ schema_cache.insert(table.schema().id(), Arc::new(table.schema().clone()));
+
+ let mut rows = Vec::new();
+ for split in scan.splits() {
+ rows.extend(data_split_rows(table, split, &partition_fields, &mut
schema_cache).await?);
+ }
+ Ok(rows)
+}
+
+async fn data_split_rows(
+ table: &Table,
+ split: &DataSplit,
+ partition_fields: &[DataField],
+ schema_cache: &mut HashMap<i64, Arc<TableSchema>>,
+) -> paimon::Result<Vec<FileRow>> {
+ let partition = format_partition(split.partition(), partition_fields)?;
+ let mut rows = Vec::with_capacity(split.data_files().len());
+ for file in split.data_files() {
+ let file_schema = schema_for_id(table, schema_cache,
file.schema_id).await?;
+ let key_fields = key_fields_for_schema(&file_schema);
+ let min_key = format_key(&file.min_key, &key_fields)?;
+ let max_key = format_key(&file.max_key, &key_fields)?;
+ let stats = format_value_stats(table, schema_cache, file).await?;
+
+ rows.push(FileRow {
+ partition: partition.clone(),
+ bucket: split.bucket(),
+ file_path: file
+ .external_path
+ .clone()
+ .unwrap_or_else(|| split.data_file_path(file)),
+ file_format: data_file_format_identifier(&file.file_name)?,
+ schema_id: file.schema_id,
+ level: file.level,
+ record_count: file.row_count,
+ file_size_in_bytes: file.file_size,
+ min_key,
+ max_key,
+ null_value_counts: stats.null_value_counts,
+ min_value_stats: stats.min_value_stats,
+ max_value_stats: stats.max_value_stats,
+ min_sequence_number: Some(file.min_sequence_number),
+ max_sequence_number: Some(file.max_sequence_number),
+ creation_time: file.creation_time.map(|t| t.timestamp_millis()),
+ delete_row_count: file.delete_row_count,
+ file_source: file_source_to_string(file.file_source),
+ first_row_id: file.first_row_id,
+ write_cols: file.write_cols.clone(),
+ });
+ }
+ Ok(rows)
+}
+
+async fn schema_for_id(
+ table: &Table,
+ schema_cache: &mut HashMap<i64, Arc<TableSchema>>,
+ schema_id: i64,
+) -> paimon::Result<Arc<TableSchema>> {
+ if let Some(schema) = schema_cache.get(&schema_id) {
+ return Ok(schema.clone());
+ }
+ let schema = table.schema_manager().schema(schema_id).await?;
+ schema_cache.insert(schema_id, schema.clone());
+ Ok(schema)
+}
+
+fn format_partition(
+ partition: &BinaryRow,
+ partition_fields: &[DataField],
+) -> paimon::Result<Option<String>> {
+ if partition_fields.is_empty() {
+ return Ok(Some("{}".to_string()));
+ }
+ format_row_as_java_cast_string(partition, partition_fields).map(Some)
+}
+
+fn key_fields_for_schema(schema: &TableSchema) -> Vec<DataField> {
+ let trimmed_primary_keys = schema.trimmed_primary_keys();
+ if trimmed_primary_keys.is_empty() {
+ return schema.fields().to_vec();
+ }
+
+ trimmed_primary_keys
+ .iter()
+ .filter_map(|name| {
+ schema
+ .fields()
+ .iter()
+ .find(|field| field.name() == name)
+ .cloned()
+ })
+ .collect()
+}
+
+fn format_key(bytes: &[u8], key_fields: &[DataField]) ->
paimon::Result<Option<String>> {
+ if bytes.is_empty() {
+ return Ok(None);
+ }
+ let row = BinaryRow::from_serialized_bytes(bytes)?;
+ if row.arity() <= 0 {
+ return Ok(None);
+ }
+ format_row_as_java_array_string(&row, key_fields).map(Some)
+}
+
+#[derive(Debug)]
+struct FormattedStats {
+ null_value_counts: String,
+ min_value_stats: String,
+ max_value_stats: String,
+}
+
+async fn format_value_stats(
+ table: &Table,
+ schema_cache: &mut HashMap<i64, Arc<TableSchema>>,
+ file: &DataFileMeta,
+) -> paimon::Result<FormattedStats> {
+ let table_fields = table.schema().fields();
+ let file_schema = schema_for_id(table, schema_cache,
file.schema_id).await?;
+ let data_fields = file_schema.fields();
+ let field_mapping = table_to_data_field_mapping(table_fields, data_fields);
+ let dense_mapping = dense_stats_mapping(data_fields,
file.value_stats_cols.as_deref());
+ let min_row = decode_stats_row(file.value_stats.min_values())?;
+ let max_row = decode_stats_row(file.value_stats.max_values())?;
+
+ let mut null_counts = BTreeMap::new();
+ let mut lower_bounds = BTreeMap::new();
+ let mut upper_bounds = BTreeMap::new();
+
+ for (table_index, table_field) in table_fields.iter().enumerate() {
+ let data_index = field_mapping.get(table_index).copied().flatten();
+ let stats_index =
+ data_index.and_then(|idx| stats_index_for_data_field(idx,
&dense_mapping));
+
+ let null_count = match (data_index, stats_index) {
+ (None, _) => Some(file.row_count),
+ (Some(_), Some(idx)) =>
file.value_stats.null_counts().get(idx).copied().flatten(),
+ (Some(_), None) => None,
+ };
+ null_counts.insert(table_field.name().to_string(), null_count);
+
+ let value_type = data_index
+ .and_then(|idx| data_fields.get(idx))
+ .map(DataField::data_type)
+ .unwrap_or_else(|| table_field.data_type());
+ lower_bounds.insert(
+ table_field.name().to_string(),
+ format_stats_value(min_row.as_ref(), stats_index, value_type)?,
+ );
+ upper_bounds.insert(
+ table_field.name().to_string(),
+ format_stats_value(max_row.as_ref(), stats_index, value_type)?,
+ );
+ }
+
+ Ok(FormattedStats {
+ null_value_counts: format_java_map(&null_counts, |v| {
+ v.map(|count| count.to_string())
+ .unwrap_or_else(|| "null".to_string())
+ }),
+ min_value_stats: format_java_map(&lower_bounds, |v| {
+ v.clone().unwrap_or_else(|| "null".to_string())
+ }),
+ max_value_stats: format_java_map(&upper_bounds, |v| {
+ v.clone().unwrap_or_else(|| "null".to_string())
+ }),
+ })
+}
+
+fn table_to_data_field_mapping(
+ table_fields: &[DataField],
+ data_fields: &[DataField],
+) -> Vec<Option<usize>> {
+ let data_field_index: HashMap<i32, usize> = data_fields
+ .iter()
+ .enumerate()
+ .map(|(idx, field)| (field.id(), idx))
+ .collect();
+ let mapping: Vec<Option<usize>> = table_fields
+ .iter()
+ .map(|field| data_field_index.get(&field.id()).copied())
+ .collect();
+
+ let identity = mapping.len() == data_fields.len()
+ && mapping
+ .iter()
+ .enumerate()
+ .all(|(idx, mapped)| *mapped == Some(idx));
+ if identity {
+ (0..table_fields.len()).map(Some).collect()
+ } else {
+ mapping
+ }
+}
+
+fn dense_stats_mapping(
+ data_fields: &[DataField],
+ dense_fields: Option<&[String]>,
+) -> Option<Vec<Option<usize>>> {
+ dense_fields.map(|dense_fields| {
+ let dense_index: HashMap<&str, usize> = dense_fields
+ .iter()
+ .enumerate()
+ .map(|(idx, name)| (name.as_str(), idx))
+ .collect();
+ data_fields
+ .iter()
+ .map(|field| dense_index.get(field.name()).copied())
+ .collect()
+ })
+}
+
+fn stats_index_for_data_field(
+ data_index: usize,
+ dense_mapping: &Option<Vec<Option<usize>>>,
+) -> Option<usize> {
+ match dense_mapping {
+ None => Some(data_index),
+ Some(mapping) => mapping.get(data_index).copied().flatten(),
+ }
+}
+
+fn decode_stats_row(bytes: &[u8]) -> paimon::Result<Option<BinaryRow>> {
+ if bytes.is_empty() {
+ Ok(None)
+ } else {
+ BinaryRow::from_serialized_bytes(bytes).map(Some)
+ }
+}
+
+fn format_stats_value(
+ row: Option<&BinaryRow>,
+ stats_index: Option<usize>,
+ data_type: &paimon::spec::DataType,
+) -> paimon::Result<Option<String>> {
+ let Some(row) = row else {
+ return Ok(None);
+ };
+ let Some(stats_index) = stats_index else {
+ return Ok(None);
+ };
+ if stats_index >= row.arity() as usize {
+ return Ok(None);
+ }
+ format_row_field_as_java_cast_string(row, stats_index, data_type)
+}
+
+fn format_java_map<T, F>(map: &BTreeMap<String, T>, value_to_string: F) ->
String
+where
+ F: Fn(&T) -> String,
+{
+ let mut out = String::from("{");
+ for (idx, (key, value)) in map.iter().enumerate() {
+ if idx > 0 {
+ out.push_str(", ");
+ }
+ out.push_str(key);
+ out.push('=');
+ out.push_str(&value_to_string(value));
+ }
+ out.push('}');
+ out
+}
+
+fn data_file_format_identifier(file_name: &str) -> paimon::Result<String> {
+ let Some(dot) = file_name.rfind('.') else {
+ return Err(paimon::Error::DataInvalid {
+ message: format!("{file_name} is not a legal file name."),
+ source: None,
+ });
+ };
+
+ let extension = &file_name[dot + 1..];
+ if is_hadoop_compression_extension(extension) {
+ let Some(second_dot) = file_name[..dot].rfind('.') else {
+ return Err(paimon::Error::DataInvalid {
+ message: format!("{file_name} is not a legal file name."),
+ source: None,
+ });
+ };
+ return Ok(file_name[second_dot + 1..dot].to_string());
+ }
+
+ Ok(extension.to_string())
+}
+
+fn is_hadoop_compression_extension(extension: &str) -> bool {
+ ["gz", "bz2", "deflate", "snappy", "lz4", "zst"]
+ .iter()
+ .any(|known| extension.eq_ignore_ascii_case(known))
+}
+
+fn file_source_to_string(file_source: Option<i32>) -> Option<String> {
+ file_source.map(|source| match source {
+ 0 => "APPEND".to_string(),
+ 1 => "COMPACT".to_string(),
+ other => other.to_string(),
+ })
+}
+
+fn file_rows_to_record_batch(rows: &[FileRow]) -> DFResult<RecordBatch> {
+ let n = rows.len();
+ let mut partitions = Vec::with_capacity(n);
+ let mut buckets = Vec::with_capacity(n);
+ let mut file_paths = Vec::with_capacity(n);
+ let mut file_formats = Vec::with_capacity(n);
+ let mut schema_ids = Vec::with_capacity(n);
+ let mut levels = Vec::with_capacity(n);
+ let mut record_counts = Vec::with_capacity(n);
+ let mut file_sizes = Vec::with_capacity(n);
+ let mut min_keys = Vec::with_capacity(n);
+ let mut max_keys = Vec::with_capacity(n);
+ let mut null_value_counts = Vec::with_capacity(n);
+ let mut min_value_stats = Vec::with_capacity(n);
+ let mut max_value_stats = Vec::with_capacity(n);
+ let mut min_sequence_numbers = Vec::with_capacity(n);
+ let mut max_sequence_numbers = Vec::with_capacity(n);
+ let mut creation_times = Vec::with_capacity(n);
+ let mut delete_row_counts = Vec::with_capacity(n);
+ let mut file_sources = Vec::with_capacity(n);
+ let mut first_row_ids = Vec::with_capacity(n);
+ let mut write_cols = ListBuilder::new(StringBuilder::new());
+
+ for row in rows {
+ partitions.push(row.partition.clone());
+ buckets.push(row.bucket);
+ file_paths.push(row.file_path.clone());
+ file_formats.push(row.file_format.clone());
+ schema_ids.push(row.schema_id);
+ levels.push(row.level);
+ record_counts.push(row.record_count);
+ file_sizes.push(row.file_size_in_bytes);
+ min_keys.push(row.min_key.clone());
+ max_keys.push(row.max_key.clone());
+ null_value_counts.push(row.null_value_counts.clone());
+ min_value_stats.push(row.min_value_stats.clone());
+ max_value_stats.push(row.max_value_stats.clone());
+ min_sequence_numbers.push(row.min_sequence_number);
+ max_sequence_numbers.push(row.max_sequence_number);
+ creation_times.push(row.creation_time);
+ delete_row_counts.push(row.delete_row_count);
+ file_sources.push(row.file_source.clone());
+ first_row_ids.push(row.first_row_id);
+ match &row.write_cols {
+ Some(cols) => {
+ for col in cols {
+ write_cols.values().append_value(col);
+ }
+ write_cols.append(true);
+ }
+ None => write_cols.append(false),
+ }
+ }
+
+ Ok(RecordBatch::try_new(
+ files_schema(),
+ vec![
+ Arc::new(StringArray::from(partitions)),
+ Arc::new(Int32Array::from(buckets)),
+ Arc::new(StringArray::from(file_paths)),
+ Arc::new(StringArray::from(file_formats)),
+ Arc::new(Int64Array::from(schema_ids)),
+ Arc::new(Int32Array::from(levels)),
+ Arc::new(Int64Array::from(record_counts)),
+ Arc::new(Int64Array::from(file_sizes)),
+ Arc::new(StringArray::from(min_keys)),
+ Arc::new(StringArray::from(max_keys)),
+ Arc::new(StringArray::from(null_value_counts)),
+ Arc::new(StringArray::from(min_value_stats)),
+ Arc::new(StringArray::from(max_value_stats)),
+ Arc::new(Int64Array::from(min_sequence_numbers)),
+ Arc::new(Int64Array::from(max_sequence_numbers)),
+ Arc::new(TimestampMillisecondArray::from(creation_times)),
+ Arc::new(Int64Array::from(delete_row_counts)),
+ Arc::new(StringArray::from(file_sources)),
+ Arc::new(Int64Array::from(first_row_ids)),
+ Arc::new(write_cols.finish()),
+ ],
+ )?)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_data_file_format_identifier() {
+ assert_eq!(
+ data_file_format_identifier("part-0.parquet").unwrap(),
+ "parquet"
+ );
+ assert_eq!(data_file_format_identifier("part-0.csv.gz").unwrap(),
"csv");
+ assert_eq!(
+ data_file_format_identifier("part-0.orc.zst").unwrap(),
+ "orc"
+ );
+ }
+
+ #[test]
+ fn test_format_java_map() {
+ let mut map = BTreeMap::new();
+ map.insert("b".to_string(), Some(2));
+ map.insert("a".to_string(), None);
+ assert_eq!(
+ format_java_map(&map, |v| v
+ .map(|v| v.to_string())
+ .unwrap_or_else(|| "null".to_string())),
+ "{a=null, b=2}"
+ );
+ }
+}
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
index af3b2fe..0305d2f 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -30,6 +30,7 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;
mod branches;
+mod files;
mod manifests;
mod options;
mod partitions;
@@ -48,6 +49,7 @@ type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
// metadata via `Catalog::list_partitions`).
const TABLES: &[(&str, Builder)] = &[
("branches", branches::build),
+ ("files", files::build),
("manifests", manifests::build),
("options", options::build),
("physical_files_size", physical_files_size::build),
@@ -60,6 +62,7 @@ const TABLES: &[(&str, Builder)] = &[
const SYSTEM_TABLE_NAMES: &[&str] = &[
"branches",
+ "files",
"manifests",
"options",
"partitions",
@@ -187,6 +190,9 @@ mod tests {
assert!(is_registered("branches"));
assert!(is_registered("Branches"));
assert!(is_registered("BRANCHES"));
+ assert!(is_registered("files"));
+ assert!(is_registered("Files"));
+ assert!(is_registered("FILES"));
assert!(is_registered("tags"));
assert!(is_registered("Tags"));
assert!(is_registered("TAGS"));
diff --git
a/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
b/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
index 1501d1c..11c1d46 100644
--- a/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
+++ b/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
@@ -47,6 +47,35 @@ pub(super) fn format_row_as_java_cast_string(
Ok(out)
}
+pub(super) fn format_row_as_java_array_string(
+ row: &BinaryRow,
+ fields: &[DataField],
+) -> Result<String> {
+ validate_row(row, fields)?;
+
+ let mut out = String::from("[");
+ for (pos, field) in fields.iter().enumerate() {
+ if pos > 0 {
+ out.push_str(", ");
+ }
+ out.push_str(&format_field(row, pos, field.data_type())?);
+ }
+ out.push(']');
+ Ok(out)
+}
+
+pub(super) fn format_row_field_as_java_cast_string(
+ row: &BinaryRow,
+ pos: usize,
+ data_type: &DataType,
+) -> Result<Option<String>> {
+ validate_row_has_field(row, pos)?;
+ if row.is_null_at(pos) {
+ return Ok(None);
+ }
+ format_field(row, pos, data_type).map(Some)
+}
+
fn validate_row(row: &BinaryRow, fields: &[DataField]) -> Result<()> {
if row.arity() < 0 {
return Err(data_invalid(format!(
@@ -74,6 +103,32 @@ fn validate_row(row: &BinaryRow, fields: &[DataField]) ->
Result<()> {
Ok(())
}
+fn validate_row_has_field(row: &BinaryRow, pos: usize) -> Result<()> {
+ if row.arity() < 0 {
+ return Err(data_invalid(format!(
+ "Row string cast row has negative arity {}",
+ row.arity()
+ )));
+ }
+
+ let arity = row.arity() as usize;
+ if pos >= arity {
+ return Err(data_invalid(format!(
+ "Row string cast field index {pos} is outside row arity {arity}"
+ )));
+ }
+
+ let min_size = BinaryRow::cal_fix_part_size_in_bytes(row.arity()) as usize;
+ if row.data().len() < min_size {
+ return Err(data_invalid(format!(
+ "Row string cast row data too short: need at least {min_size}
bytes, got {}",
+ row.data().len()
+ )));
+ }
+
+ Ok(())
+}
+
fn format_field(row: &BinaryRow, pos: usize, data_type: &DataType) ->
Result<String> {
let Some(datum) = row.get_datum(pos, data_type)? else {
return Ok("null".to_string());
diff --git a/crates/integrations/datafusion/tests/system_tables.rs
b/crates/integrations/datafusion/tests/system_tables.rs
index e6d1717..ad4805d 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -786,6 +786,99 @@ async fn test_manifests_system_table_partition_stats() {
assert_eq!(max_partition, Some(2));
}
+#[tokio::test]
+async fn test_files_system_table() {
+ let (ctx, catalog, _tmp) = create_context().await;
+ let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$files");
+ let batches = run_sql(&ctx, &sql).await;
+
+ assert!(
+ !batches.is_empty(),
+ "$files should return at least one batch"
+ );
+ let arrow_schema = batches[0].schema();
+ let expected_columns = [
+ ("partition", DataType::Utf8),
+ ("bucket", DataType::Int32),
+ ("file_path", DataType::Utf8),
+ ("file_format", DataType::Utf8),
+ ("schema_id", DataType::Int64),
+ ("level", DataType::Int32),
+ ("record_count", DataType::Int64),
+ ("file_size_in_bytes", DataType::Int64),
+ ("min_key", DataType::Utf8),
+ ("max_key", DataType::Utf8),
+ ("null_value_counts", DataType::Utf8),
+ ("min_value_stats", DataType::Utf8),
+ ("max_value_stats", DataType::Utf8),
+ ("min_sequence_number", DataType::Int64),
+ ("max_sequence_number", DataType::Int64),
+ (
+ "creation_time",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ ),
+ ("delete_row_count", DataType::Int64),
+ ("file_source", DataType::Utf8),
+ ("first_row_id", DataType::Int64),
+ (
+ "write_cols",
+ DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
+ ),
+ ];
+ for (i, (name, dtype)) in expected_columns.iter().enumerate() {
+ let field = arrow_schema.field(i);
+ assert_eq!(field.name(), name, "column {i} name");
+ assert_eq!(field.data_type(), dtype, "column {i} type");
+ }
+
+ let identifier = Identifier::new("default".to_string(),
FIXTURE_TABLE.to_string());
+ let table = catalog.get_table(&identifier).await.unwrap();
+ let plan = table
+ .new_read_builder()
+ .new_scan()
+ .with_scan_all_files()
+ .plan()
+ .await
+ .unwrap();
+ let expected_rows: usize = plan
+ .splits()
+ .iter()
+ .map(|split| split.data_files().len())
+ .sum();
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, expected_rows);
+ assert!(total_rows > 0, "fixture should contain data files");
+
+ for batch in &batches {
+ let paths = batch
+ .column(2)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("file_path is Utf8");
+ let formats = batch
+ .column(3)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("file_format is Utf8");
+ let null_counts = batch
+ .column(10)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("null_value_counts is Utf8");
+ for i in 0..batch.num_rows() {
+ assert!(!paths.value(i).is_empty(), "file_path must be non-empty");
+ assert!(
+ !formats.value(i).is_empty(),
+ "file_format must be non-empty"
+ );
+ assert!(
+ null_counts.value(i).starts_with('{') &&
null_counts.value(i).ends_with('}'),
+ "null_value_counts should be Java-map-like"
+ );
+ }
+ }
+}
+
fn single_int_partition_stat(value: &str) -> i32 {
value
.strip_prefix('{')