This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ff1faa5 Display partition stats in manifests system table (#308)
ff1faa5 is described below
commit ff1faa50fd2b30379fd8a22746be2a97067ad344
Author: QuakeWang <[email protected]>
AuthorDate: Mon May 11 13:53:58 2026 +0800
Display partition stats in manifests system table (#308)
---
crates/integrations/datafusion/Cargo.toml | 1 +
.../datafusion/src/system_tables/manifests.rs | 162 ++++++-
.../datafusion/src/system_tables/mod.rs | 1 +
.../src/system_tables/row_string_cast.rs | 517 +++++++++++++++++++++
.../integrations/datafusion/tests/system_tables.rs | 77 +++
docs/src/sql.md | 4 +-
6 files changed, 756 insertions(+), 6 deletions(-)
diff --git a/crates/integrations/datafusion/Cargo.toml
b/crates/integrations/datafusion/Cargo.toml
index a9de48a..30b5100 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -41,6 +41,7 @@ futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { workspace = true, features = ["rt", "time", "fs"] }
+lexical-write-float = "1.0.6"
uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
diff --git a/crates/integrations/datafusion/src/system_tables/manifests.rs
b/crates/integrations/datafusion/src/system_tables/manifests.rs
index 12b2864..89bf66c 100644
--- a/crates/integrations/datafusion/src/system_tables/manifests.rs
+++ b/crates/integrations/datafusion/src/system_tables/manifests.rs
@@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::{Arc, OnceLock};
use async_trait::async_trait;
-use datafusion::arrow::array::{new_null_array, Int64Array, RecordBatch,
StringArray};
+use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
@@ -29,11 +29,15 @@ use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
-use paimon::spec::{ManifestFileMeta, ManifestList};
+use paimon::spec::{BinaryRow, DataField, ManifestFileMeta, ManifestList};
use paimon::table::{SnapshotManager, Table};
+use super::row_string_cast::format_row_as_java_cast_string;
use crate::error::to_datafusion_error;
+const MIN_PARTITION_STATS_INDEX: usize = 5;
+const MAX_PARTITION_STATS_INDEX: usize = 6;
+
pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
Ok(Arc::new(ManifestsTable { table }))
}
@@ -95,15 +99,42 @@ impl TableProvider for ManifestsTable {
let mut num_added = Vec::with_capacity(n);
let mut num_deleted = Vec::with_capacity(n);
let mut schema_ids = Vec::with_capacity(n);
+ let mut min_partition_stats: Vec<Option<String>> =
Vec::with_capacity(n);
+ let mut max_partition_stats: Vec<Option<String>> =
Vec::with_capacity(n);
let mut min_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
let mut max_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
+ let partition_fields = self.table.schema().partition_fields();
+ let projected_columns = projection.map(Vec::as_slice);
+ let materialize_min_partition_stats =
+ should_materialize_column(projected_columns,
MIN_PARTITION_STATS_INDEX);
+ let materialize_max_partition_stats =
+ should_materialize_column(projected_columns,
MAX_PARTITION_STATS_INDEX);
for meta in metas {
+ let stats = meta.partition_stats();
file_names.push(meta.file_name().to_string());
file_sizes.push(meta.file_size());
num_added.push(meta.num_added_files());
num_deleted.push(meta.num_deleted_files());
schema_ids.push(meta.schema_id());
+ min_partition_stats.push(
+ materialize_partition_stats_value(
+ materialize_min_partition_stats,
+ stats.min_values(),
+ stats.null_counts(),
+ &partition_fields,
+ )
+ .map_err(to_datafusion_error)?,
+ );
+ max_partition_stats.push(
+ materialize_partition_stats_value(
+ materialize_max_partition_stats,
+ stats.max_values(),
+ stats.null_counts(),
+ &partition_fields,
+ )
+ .map_err(to_datafusion_error)?,
+ );
min_row_ids.push(meta.min_row_id());
max_row_ids.push(meta.max_row_id());
}
@@ -117,8 +148,8 @@ impl TableProvider for ManifestsTable {
Arc::new(Int64Array::from(num_added)),
Arc::new(Int64Array::from(num_deleted)),
Arc::new(Int64Array::from(schema_ids)),
- new_null_array(&DataType::Utf8, n),
- new_null_array(&DataType::Utf8, n),
+ Arc::new(StringArray::from(min_partition_stats)),
+ Arc::new(StringArray::from(max_partition_stats)),
Arc::new(Int64Array::from(min_row_ids)),
Arc::new(Int64Array::from(max_row_ids)),
],
@@ -159,3 +190,126 @@ async fn collect_manifests(table: &Table) ->
paimon::Result<Vec<ManifestFileMeta
metas.extend(changelog);
Ok(metas)
}
+
+fn should_materialize_column(projection: Option<&[usize]>, column_index:
usize) -> bool {
+ match projection {
+ Some(projection) => projection.contains(&column_index),
+ None => true,
+ }
+}
+
+fn materialize_partition_stats_value(
+ materialize: bool,
+ value_bytes: &[u8],
+ null_counts: &[Option<i64>],
+ partition_fields: &[DataField],
+) -> paimon::Result<Option<String>> {
+ if materialize {
+ format_partition_stats_value(value_bytes, null_counts,
partition_fields)
+ } else {
+ Ok(None)
+ }
+}
+
+fn format_partition_stats_value(
+ value_bytes: &[u8],
+ null_counts: &[Option<i64>],
+ partition_fields: &[DataField],
+) -> paimon::Result<Option<String>> {
+ if value_bytes.is_empty() {
+ return if partition_fields.is_empty() || null_counts.len() ==
partition_fields.len() {
+ Ok(Some(format_all_null_partition_row(partition_fields.len())))
+ } else {
+ Ok(None)
+ };
+ }
+
+ let row = BinaryRow::from_serialized_bytes(value_bytes)?;
+ format_row_as_java_cast_string(&row, partition_fields).map(Some)
+}
+
+fn format_all_null_partition_row(arity: usize) -> String {
+ if arity == 0 {
+ return "{}".to_string();
+ }
+ format!("{{{}}}", vec!["null"; arity].join(", "))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use paimon::spec::{DataType as PaimonDataType, Datum, FloatType, IntType,
VarCharType};
+
+ fn field(name: &str, data_type: PaimonDataType) -> DataField {
+ DataField::new(0, name.to_string(), data_type)
+ }
+
+ fn serialized_row(values: &[(Option<Datum>, PaimonDataType)]) -> Vec<u8> {
+ let refs: Vec<_> = values
+ .iter()
+ .map(|(datum, data_type)| (datum.as_ref(), data_type))
+ .collect();
+ BinaryRow::from_datums(&refs).to_serialized_bytes()
+ }
+
+ #[test]
+ fn test_should_materialize_column() {
+ let projected_stats = vec![MIN_PARTITION_STATS_INDEX];
+ let projected_without_stats = vec![0, 1, 2];
+
+ assert!(should_materialize_column(None, MIN_PARTITION_STATS_INDEX));
+ assert!(should_materialize_column(
+ Some(projected_stats.as_slice()),
+ MIN_PARTITION_STATS_INDEX
+ ));
+ assert!(!should_materialize_column(
+ Some(projected_without_stats.as_slice()),
+ MIN_PARTITION_STATS_INDEX
+ ));
+ }
+
+ #[test]
+ fn test_unprojected_partition_stats_are_not_formatted() {
+ let data_type = PaimonDataType::Float(FloatType::new());
+ let fields = vec![field("pt", data_type.clone())];
+ let bytes = serialized_row(&[(Some(Datum::Float(1.0)),
data_type.clone())]);
+
+ assert_eq!(
+ materialize_partition_stats_value(false, &bytes, &[Some(0)],
&fields).unwrap(),
+ None
+ );
+ assert_eq!(
+ materialize_partition_stats_value(true, &bytes, &[Some(0)],
&fields).unwrap(),
+ Some("{1.0}".to_string())
+ );
+ }
+
+ #[test]
+ fn test_format_empty_partition_row() {
+ assert_eq!(
+ format_partition_stats_value(&[], &[], &[]).unwrap(),
+ Some("{}".to_string())
+ );
+ }
+
+ #[test]
+ fn test_format_empty_bytes_with_matching_null_counts_as_all_null() {
+ let fields = vec![
+ field("pt1", PaimonDataType::Int(IntType::new())),
+ field("pt2", PaimonDataType::VarChar(VarCharType::string_type())),
+ ];
+ assert_eq!(
+ format_partition_stats_value(&[], &[Some(2), Some(2)],
&fields).unwrap(),
+ Some("{null, null}".to_string())
+ );
+ }
+
+ #[test]
+ fn test_format_empty_bytes_with_mismatched_null_counts_as_unknown() {
+ let fields = vec![field("pt", PaimonDataType::Int(IntType::new()))];
+ assert_eq!(
+ format_partition_stats_value(&[], &[], &fields).unwrap(),
+ None
+ );
+ }
+}
diff --git a/crates/integrations/datafusion/src/system_tables/mod.rs
b/crates/integrations/datafusion/src/system_tables/mod.rs
index 1bdc0d1..4233667 100644
--- a/crates/integrations/datafusion/src/system_tables/mod.rs
+++ b/crates/integrations/datafusion/src/system_tables/mod.rs
@@ -32,6 +32,7 @@ use crate::error::to_datafusion_error;
mod branches;
mod manifests;
mod options;
+mod row_string_cast;
mod schemas;
mod snapshots;
mod tags;
diff --git
a/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
b/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
new file mode 100644
index 0000000..1501d1c
--- /dev/null
+++ b/crates/integrations/datafusion/src/system_tables/row_string_cast.rs
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::num::NonZeroI32;
+
+use chrono::{Local, NaiveDate, NaiveDateTime, TimeZone, Timelike};
+use lexical_write_float::{format::STANDARD, Options, ToLexicalWithOptions};
+use paimon::spec::{BinaryRow, DataField, DataType, Datum};
+use paimon::{Error, Result};
+
+const MILLIS_PER_DAY: i64 = 86_400_000;
+const JAVA_FLOAT_OPTIONS: Options = Options::builder()
+ .positive_exponent_break(NonZeroI32::new(6))
+ .negative_exponent_break(NonZeroI32::new(-3))
+ .exponent(b'E')
+ .inf_string(Some(b"Infinity"))
+ .build_strict();
+
+pub(super) fn format_row_as_java_cast_string(
+ row: &BinaryRow,
+ fields: &[DataField],
+) -> Result<String> {
+ validate_row(row, fields)?;
+
+ let mut out = String::from("{");
+ for (pos, field) in fields.iter().enumerate() {
+ if pos > 0 {
+ out.push_str(", ");
+ }
+ out.push_str(&format_field(row, pos, field.data_type())?);
+ }
+ out.push('}');
+ Ok(out)
+}
+
+fn validate_row(row: &BinaryRow, fields: &[DataField]) -> Result<()> {
+ if row.arity() < 0 {
+ return Err(data_invalid(format!(
+ "Row string cast row has negative arity {}",
+ row.arity()
+ )));
+ }
+
+ let arity = row.arity() as usize;
+ if arity != fields.len() {
+ return Err(data_invalid(format!(
+ "Row string cast row arity {arity} does not match field count {}",
+ fields.len()
+ )));
+ }
+
+ let min_size = BinaryRow::cal_fix_part_size_in_bytes(row.arity()) as usize;
+ if row.data().len() < min_size {
+ return Err(data_invalid(format!(
+ "Row string cast row data too short: need at least {min_size}
bytes, got {}",
+ row.data().len()
+ )));
+ }
+
+ Ok(())
+}
+
+fn format_field(row: &BinaryRow, pos: usize, data_type: &DataType) ->
Result<String> {
+ let Some(datum) = row.get_datum(pos, data_type)? else {
+ return Ok("null".to_string());
+ };
+
+ match (datum, data_type) {
+ (Datum::Bool(v), DataType::Boolean(_)) => Ok(v.to_string()),
+ (Datum::TinyInt(v), DataType::TinyInt(_)) => Ok(v.to_string()),
+ (Datum::SmallInt(v), DataType::SmallInt(_)) => Ok(v.to_string()),
+ (Datum::Int(v), DataType::Int(_)) => Ok(v.to_string()),
+ (Datum::Long(v), DataType::BigInt(_)) => Ok(v.to_string()),
+ (Datum::Float(v), DataType::Float(_)) => Ok(format_float(v)),
+ (Datum::Double(v), DataType::Double(_)) => Ok(format_double(v)),
+ (Datum::String(v), DataType::Char(_) | DataType::VarChar(_)) => Ok(v),
+ (Datum::Bytes(v), DataType::Binary(_) | DataType::VarBinary(_)) => {
+ Ok(String::from_utf8_lossy(&v).into_owned())
+ }
+ (Datum::Date(v), DataType::Date(_)) => format_date(v),
+ (Datum::Time(v), DataType::Time(t)) => format_time(v, t.precision()),
+ (
+ Datum::Decimal {
+ unscaled, scale, ..
+ },
+ DataType::Decimal(_),
+ ) => Ok(format_decimal_plain(unscaled, scale)),
+ (Datum::Timestamp { millis, nanos }, DataType::Timestamp(t)) => {
+ format_timestamp(millis, nanos, t.precision())
+ }
+ (Datum::LocalZonedTimestamp { millis, nanos },
DataType::LocalZonedTimestamp(t)) => {
+ format_local_zoned_timestamp(millis, nanos, t.precision())
+ }
+ (datum, _) => Err(data_invalid(format!(
+ "Decoded row string cast datum {datum:?} does not match type
{data_type:?}"
+ ))),
+ }
+}
+
+fn format_float(value: f32) -> String {
+ const BUFFER_SIZE: usize = JAVA_FLOAT_OPTIONS.buffer_size_const::<f32,
STANDARD>();
+ let mut buffer = [0u8; BUFFER_SIZE];
+ let bytes = value.to_lexical_with_options::<STANDARD>(&mut buffer,
&JAVA_FLOAT_OPTIONS);
+ std::str::from_utf8(bytes)
+ .expect("lexical float output is valid UTF-8")
+ .to_string()
+}
+
+fn format_double(value: f64) -> String {
+ const BUFFER_SIZE: usize = JAVA_FLOAT_OPTIONS.buffer_size_const::<f64,
STANDARD>();
+ let mut buffer = [0u8; BUFFER_SIZE];
+ let bytes = value.to_lexical_with_options::<STANDARD>(&mut buffer,
&JAVA_FLOAT_OPTIONS);
+ std::str::from_utf8(bytes)
+ .expect("lexical double output is valid UTF-8")
+ .to_string()
+}
+
+fn format_date(epoch_days: i32) -> Result<String> {
+ let ce_days = epoch_days.checked_add(719_163).ok_or_else(|| {
+ data_invalid(format!(
+ "Date row string cast value {epoch_days} is outside supported
range"
+ ))
+ })?;
+ let date = NaiveDate::from_num_days_from_ce_opt(ce_days).ok_or_else(|| {
+ data_invalid(format!(
+ "Date row string cast value {epoch_days} is outside supported
range"
+ ))
+ })?;
+ Ok(date.format("%Y-%m-%d").to_string())
+}
+
+fn format_time(millis_of_day: i32, precision: u32) -> Result<String> {
+ let mut millis = millis_of_day as i64;
+ while millis < 0 {
+ millis += MILLIS_PER_DAY;
+ }
+
+ let h = millis / 3_600_000;
+ let m = (millis % 3_600_000) / 60_000;
+ let s = (millis % 60_000) / 1_000;
+ let mut ms = millis % 1_000;
+ let mut out = format!("{h:02}:{m:02}:{s:02}");
+
+ if precision > 0 {
+ out.push('.');
+ let mut remaining = precision;
+ while remaining > 0 {
+ out.push((b'0' + (ms / 100) as u8) as char);
+ ms = (ms % 100) * 10;
+ if ms == 0 {
+ break;
+ }
+ remaining -= 1;
+ }
+ }
+
+ Ok(out)
+}
+
+fn format_decimal_plain(unscaled: i128, scale: u32) -> String {
+ if scale == 0 {
+ return unscaled.to_string();
+ }
+
+ let negative = unscaled < 0;
+ let abs = if unscaled == i128::MIN {
+ (i128::MAX as u128) + 1
+ } else {
+ unscaled.unsigned_abs()
+ };
+
+ let digits = abs.to_string();
+ let scale = scale as usize;
+ let result = if digits.len() <= scale {
+ let mut s = String::with_capacity(scale + 2);
+ s.push_str("0.");
+ for _ in 0..(scale - digits.len()) {
+ s.push('0');
+ }
+ s.push_str(&digits);
+ s
+ } else {
+ let int_len = digits.len() - scale;
+ let mut s = String::with_capacity(digits.len() + 1);
+ s.push_str(&digits[..int_len]);
+ s.push('.');
+ s.push_str(&digits[int_len..]);
+ s
+ };
+
+ if negative {
+ format!("-{result}")
+ } else {
+ result
+ }
+}
+
+fn format_timestamp(millis: i64, nano_of_milli: i32, precision: u32) ->
Result<String> {
+ format_timestamp_naive(millis_to_naive_datetime(millis, nano_of_milli)?,
precision)
+}
+
+fn format_local_zoned_timestamp(millis: i64, nano_of_milli: i32, precision:
u32) -> Result<String> {
+ let nanos = timestamp_nanos(millis, nano_of_milli)?;
+ let secs = millis.div_euclid(1000);
+ let local = Local
+ .timestamp_opt(secs, nanos)
+ .single()
+ .ok_or_else(|| data_invalid(format!("Invalid local zoned timestamp
millis {millis}")))?;
+ format_timestamp_naive(local.naive_local(), precision)
+}
+
+fn format_timestamp_naive(dt: NaiveDateTime, precision: u32) -> Result<String>
{
+ let precision = usize::try_from(precision).map_err(|e| Error::DataInvalid {
+ message: format!("Timestamp row string cast precision {precision} is
invalid"),
+ source: Some(Box::new(e)),
+ })?;
+ if precision > 9 {
+ return Err(data_invalid(format!(
+ "Timestamp row string cast precision {precision} is outside 0..=9"
+ )));
+ }
+
+ let mut out = dt.format("%Y-%m-%d %H:%M:%S").to_string();
+ if precision > 0 {
+ let fraction = format!("{:09}", dt.nanosecond());
+ out.push('.');
+ out.push_str(&fraction[..precision]);
+ }
+ Ok(out)
+}
+
+fn millis_to_naive_datetime(millis: i64, nano_of_milli: i32) ->
Result<NaiveDateTime> {
+ let nanos = timestamp_nanos(millis, nano_of_milli)?;
+ let days = millis.div_euclid(MILLIS_PER_DAY);
+ let millis_of_day = millis.rem_euclid(MILLIS_PER_DAY) as u64;
+ let nano_of_day = millis_of_day * 1_000_000 + u64::from(nanos % 1_000_000);
+ let ce_days = days.checked_add(719_163).ok_or_else(|| {
+ data_invalid(format!(
+ "Timestamp row string cast millis {millis} is outside supported
range"
+ ))
+ })?;
+ let ce_days = i32::try_from(ce_days).map_err(|e| Error::DataInvalid {
+ message: format!("Timestamp row string cast millis {millis} is outside
supported range"),
+ source: Some(Box::new(e)),
+ })?;
+ let date = NaiveDate::from_num_days_from_ce_opt(ce_days).ok_or_else(|| {
+ data_invalid(format!(
+ "Timestamp row string cast millis {millis} is outside supported
range"
+ ))
+ })?;
+ let time = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
+ (nano_of_day / 1_000_000_000) as u32,
+ (nano_of_day % 1_000_000_000) as u32,
+ )
+ .ok_or_else(|| data_invalid(format!("Invalid timestamp millis
{millis}")))?;
+ Ok(NaiveDateTime::new(date, time))
+}
+
+fn timestamp_nanos(millis: i64, nano_of_milli: i32) -> Result<u32> {
+ if !(0..=999_999).contains(&nano_of_milli) {
+ return Err(data_invalid(format!(
+ "Timestamp nano-of-millisecond {nano_of_milli} is outside
0..=999999"
+ )));
+ }
+ Ok(millis.rem_euclid(1000) as u32 * 1_000_000 + nano_of_milli as u32)
+}
+
+fn data_invalid(message: String) -> Error {
+ Error::DataInvalid {
+ message,
+ source: None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use paimon::spec::{
+ BigIntType, BinaryType, BlobType, BooleanType, CharType, DateType,
DecimalType, DoubleType,
+ FloatType, IntType, LocalZonedTimestampType, SmallIntType, TimeType,
TimestampType,
+ TinyIntType, VarBinaryType, VarCharType,
+ };
+
+ fn field(id: i32, data_type: DataType) -> DataField {
+ DataField::new(id, format!("f{id}"), data_type)
+ }
+
+ fn row(values: &[(Option<Datum>, DataType)]) -> BinaryRow {
+ let refs: Vec<_> = values
+ .iter()
+ .map(|(datum, data_type)| (datum.as_ref(), data_type))
+ .collect();
+ BinaryRow::from_datums(&refs)
+ }
+
+ fn format_value(values: &[(Option<Datum>, DataType)]) -> Result<String> {
+ let fields: Vec<_> = values
+ .iter()
+ .enumerate()
+ .map(|(i, (_, data_type))| field(i as i32, data_type.clone()))
+ .collect();
+ format_row_as_java_cast_string(&row(values), &fields)
+ }
+
+ #[test]
+ fn test_format_supported_scalar_types() {
+ let values = vec![
+ (
+ Some(Datum::Bool(true)),
+ DataType::Boolean(BooleanType::new()),
+ ),
+ (
+ Some(Datum::TinyInt(-1)),
+ DataType::TinyInt(TinyIntType::new()),
+ ),
+ (
+ Some(Datum::SmallInt(2)),
+ DataType::SmallInt(SmallIntType::new()),
+ ),
+ (Some(Datum::Int(3)), DataType::Int(IntType::new())),
+ (Some(Datum::Long(4)), DataType::BigInt(BigIntType::new())),
+ (Some(Datum::Float(1.0)), DataType::Float(FloatType::new())),
+ (
+ Some(Datum::Double(10_000_000.0)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (
+ Some(Datum::String("c".to_string())),
+ DataType::Char(CharType::new(1).unwrap()),
+ ),
+ (
+ Some(Datum::Bytes(b"xy".to_vec())),
+ DataType::Binary(BinaryType::new(2).unwrap()),
+ ),
+ (
+ Some(Datum::Bytes(b"abc".to_vec())),
+ DataType::VarBinary(VarBinaryType::new(3).unwrap()),
+ ),
+ (Some(Datum::Date(19_723)), DataType::Date(DateType::new())),
+ (
+ Some(Datum::Time(45_296_000)),
+ DataType::Time(TimeType::new(3).unwrap()),
+ ),
+ (
+ Some(Datum::Decimal {
+ unscaled: -100,
+ precision: 10,
+ scale: 3,
+ }),
+ DataType::Decimal(DecimalType::new(10, 3).unwrap()),
+ ),
+ (
+ Some(Datum::Timestamp {
+ millis: 1_704_110_400_123,
+ nanos: 456_000,
+ }),
+ DataType::Timestamp(TimestampType::new(6).unwrap()),
+ ),
+ ];
+
+ assert_eq!(
+ format_value(&values).unwrap(),
+ "{true, -1, 2, 3, 4, 1.0, 1.0E7, c, xy, abc, 2024-01-01,
12:34:56.0, -0.100, 2024-01-01 12:00:00.123456}"
+ );
+ }
+
+ #[test]
+ fn test_format_float_double_uses_java_display_thresholds() {
+ let values = vec![
+ (
+ Some(Datum::Double(9_999_999.0)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (
+ Some(Datum::Double(10_000_000.0)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (
+ Some(Datum::Double(0.001)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (
+ Some(Datum::Double(0.000_999_999)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (Some(Datum::Float(-0.0)), DataType::Float(FloatType::new())),
+ (
+ Some(Datum::Double(f64::INFINITY)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (
+ Some(Datum::Double(f64::NEG_INFINITY)),
+ DataType::Double(DoubleType::new()),
+ ),
+ (
+ Some(Datum::Double(f64::NAN)),
+ DataType::Double(DoubleType::new()),
+ ),
+ ];
+
+ assert_eq!(
+ format_value(&values).unwrap(),
+ "{9999999.0, 1.0E7, 0.001, 9.99999E-4, -0.0, Infinity, -Infinity,
NaN}"
+ );
+ }
+
+ #[test]
+ fn test_format_binary_invalid_utf8_uses_lossy_string() {
+ let bytes = vec![0xff];
+ let data_type = DataType::VarBinary(VarBinaryType::new(1).unwrap());
+ let expected = String::from_utf8_lossy(&bytes).into_owned();
+
+ assert_eq!(
+ format_value(&[(Some(Datum::Bytes(bytes)), data_type)]).unwrap(),
+ format!("{{{expected}}}")
+ );
+ }
+
+ #[test]
+ fn test_format_null_values() {
+ let values = vec![
+ (None, DataType::Int(IntType::new())),
+ (None, DataType::VarChar(VarCharType::string_type())),
+ ];
+
+ assert_eq!(format_value(&values).unwrap(), "{null, null}");
+ }
+
+ #[test]
+ fn test_format_timestamp_precision_matches_java_cast() {
+ let values = vec![
+ (
+ Some(Datum::Timestamp {
+ millis: 1_704_110_400_123,
+ nanos: 456_000,
+ }),
+ DataType::Timestamp(TimestampType::new(3).unwrap()),
+ ),
+ (
+ Some(Datum::Timestamp {
+ millis: 1_704_110_400_123,
+ nanos: 456_000,
+ }),
+ DataType::Timestamp(TimestampType::new(6).unwrap()),
+ ),
+ ];
+
+ assert_eq!(
+ format_value(&values).unwrap(),
+ "{2024-01-01 12:00:00.123, 2024-01-01 12:00:00.123456}"
+ );
+ }
+
+ #[test]
+ fn test_format_local_zoned_timestamp() {
+ let data_type =
DataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap());
+ let millis = 1_704_067_200_000;
+ let expected = Local
+ .timestamp_opt(millis / 1000, 0)
+ .single()
+ .map(|dt| format!("{}.000", dt.format("%Y-%m-%d %H:%M:%S")))
+ .unwrap();
+
+ assert_eq!(
+ format_value(&[(
+ Some(Datum::LocalZonedTimestamp { millis, nanos: 0 }),
+ data_type,
+ )])
+ .unwrap(),
+ format!("{{{expected}}}")
+ );
+ }
+
+ #[test]
+ fn test_format_unsupported_type_returns_error() {
+ let data_type = DataType::Blob(BlobType::new());
+ let err = format_value(&[(Some(Datum::Bytes(b"x".to_vec())),
data_type)])
+ .expect_err("blob row string cast should be unsupported");
+ assert!(matches!(err, Error::Unsupported { .. }));
+ }
+
+ #[test]
+ fn test_format_arity_mismatch_returns_error() {
+ let int_type = DataType::Int(IntType::new());
+ let row = row(&[(Some(Datum::Int(1)), int_type.clone())]);
+ let fields = vec![
+ field(0, int_type.clone()),
+ field(1, DataType::Int(IntType::new())),
+ ];
+ let err =
+ format_row_as_java_cast_string(&row, &fields).expect_err("arity
mismatch should fail");
+ assert!(err.to_string().contains("arity"));
+ }
+
+ #[test]
+ fn test_format_truncated_row_returns_error() {
+ let row = BinaryRow::from_bytes(1, vec![0, 0, 0, 0]);
+ let fields = vec![field(0, DataType::Int(IntType::new()))];
+ let err =
+ format_row_as_java_cast_string(&row,
&fields).expect_err("truncated row should fail");
+ assert!(err.to_string().contains("too short"));
+ }
+}
diff --git a/crates/integrations/datafusion/tests/system_tables.rs
b/crates/integrations/datafusion/tests/system_tables.rs
index d39e14f..9d96e2f 100644
--- a/crates/integrations/datafusion/tests/system_tables.rs
+++ b/crates/integrations/datafusion/tests/system_tables.rs
@@ -17,6 +17,8 @@
//! Paimon system tables end-to-end via DataFusion SQL.
+mod common;
+
use std::sync::Arc;
use datafusion::arrow::array::{Array, Int64Array, StringArray};
@@ -545,3 +547,78 @@ async fn test_manifests_system_table() {
"$manifests rows should match base + delta + changelog manifest
entries of the latest snapshot"
);
}
+
+#[tokio::test]
+async fn test_manifests_system_table_partition_stats() {
+ let (_tmp, sql_context) = common::setup_sql_context().await;
+ common::exec(
+ &sql_context,
+ "CREATE TABLE paimon.test_db.manifest_stats (id INT, pt INT)
PARTITIONED BY (pt)",
+ )
+ .await;
+ common::exec(
+ &sql_context,
+ "INSERT INTO paimon.test_db.manifest_stats VALUES (1, 1), (2, 2)",
+ )
+ .await;
+
+ let batches = sql_context
+ .sql(
+ "SELECT min_partition_stats, max_partition_stats \
+ FROM paimon.test_db.manifest_stats$manifests",
+ )
+ .await
+ .expect("$manifests query should plan")
+ .collect()
+ .await
+ .expect("$manifests query should execute");
+
+ let mut stats = Vec::new();
+ for batch in &batches {
+ let mins = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("min_partition_stats is Utf8");
+ let maxs = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .expect("max_partition_stats is Utf8");
+ for row in 0..batch.num_rows() {
+ stats.push((
+ (!mins.is_null(row)).then(|| mins.value(row).to_string()),
+ (!maxs.is_null(row)).then(|| maxs.value(row).to_string()),
+ ));
+ }
+ }
+ stats.sort();
+
+ assert!(
+ !stats.is_empty(),
+ "$manifests should return partition stats"
+ );
+
+ let min_partition = stats
+ .iter()
+ .filter_map(|(min, _)| min.as_deref())
+ .map(single_int_partition_stat)
+ .min();
+ let max_partition = stats
+ .iter()
+ .filter_map(|(_, max)| max.as_deref())
+ .map(single_int_partition_stat)
+ .max();
+
+ assert_eq!(min_partition, Some(1));
+ assert_eq!(max_partition, Some(2));
+}
+
+fn single_int_partition_stat(value: &str) -> i32 {
+ value
+ .strip_prefix('{')
+ .and_then(|s| s.strip_suffix('}'))
+ .expect("partition stats should use row cast braces")
+ .parse()
+ .expect("partition stats should contain one int partition value")
+}
diff --git a/docs/src/sql.md b/docs/src/sql.md
index e8d4518..1d745d5 100644
--- a/docs/src/sql.md
+++ b/docs/src/sql.md
@@ -766,8 +766,8 @@ Columns:
| `num_added_files` | BIGINT | Number of added data files |
| `num_deleted_files` | BIGINT | Number of deleted data files |
| `schema_id` | BIGINT | Schema ID |
-| `min_partition_stats` | STRING | Min partition values |
-| `max_partition_stats` | STRING | Max partition values |
+| `min_partition_stats` | STRING | Minimum partition stats, formatted as a
Java row cast string |
+| `max_partition_stats` | STRING | Maximum partition stats, formatted as a
Java row cast string |
| `min_row_id` | BIGINT | Minimum row id covered (when row tracking is
enabled) |
| `max_row_id` | BIGINT | Maximum row id covered (when row tracking is
enabled) |