This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a169c470f feat(spec): add table_properties() method to TableMetadata
(#2002)
a169c470f is described below
commit a169c470f637e97dd07b82fb19d3537d62d7ca51
Author: Nathan Metzger <[email protected]>
AuthorDate: Sat Jan 10 02:37:18 2026 +0100
feat(spec): add table_properties() method to TableMetadata (#2002)
## Summary
Add a type-safe accessor method `table_properties()` to `TableMetadata`
that returns `TableProperties`, eliminating manual `TryFrom` conversions
at call sites.
### Changes
- Add `table_properties()` method to `TableMetadata` that parses and
returns typed `TableProperties`
- Update `transaction/mod.rs` to use the new method
- Simplify `datafusion/write.rs` property access (~60 lines reduced to
~10)
- Add unit tests for the new accessor method
Closes #1878
---
crates/iceberg/src/spec/table_metadata.rs | 119 ++++++++++++++++++++-
crates/iceberg/src/transaction/mod.rs | 7 +-
.../datafusion/src/physical_plan/write.rs | 61 +++--------
3 files changed, 132 insertions(+), 55 deletions(-)
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index cfa25decc..585cb3e2b 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -37,6 +37,7 @@ pub use
super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataB
use super::{
DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile,
SchemaId, SchemaRef,
SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile,
StructType,
+ TableProperties,
};
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
@@ -360,6 +361,13 @@ impl TableMetadata {
&self.properties
}
+ /// Returns typed table properties parsed from the raw properties map with
defaults.
+ pub fn table_properties(&self) -> Result<TableProperties> {
+ TableProperties::try_from(&self.properties).map_err(|e| {
+ Error::new(ErrorKind::DataInvalid, "Invalid table
properties").with_source(e)
+ })
+ }
+
/// Return location of statistics files.
#[inline]
pub fn statistics_iter(&self) -> impl ExactSizeIterator<Item =
&StatisticsFile> {
@@ -1561,7 +1569,6 @@ mod tests {
use uuid::Uuid;
use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
- use crate::TableCreation;
use crate::io::FileIOBuilder;
use crate::spec::table_metadata::TableMetadata;
use crate::spec::{
@@ -1570,6 +1577,7 @@ mod tests {
SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder, StatisticsFile,
Summary, Transform, Type, UnboundPartitionField,
};
+ use crate::{ErrorKind, TableCreation};
fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
let desered_type: TableMetadata = serde_json::from_str(json).unwrap();
@@ -3861,4 +3869,113 @@ mod tests {
"Parsing should fail for sort order ID 0 with fields"
);
}
+
+ #[test]
+ fn test_table_properties_with_defaults() {
+ use crate::spec::TableProperties;
+
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let metadata = TableMetadataBuilder::new(
+ schema,
+ PartitionSpec::unpartition_spec().into_unbound(),
+ SortOrder::unsorted_order(),
+ "s3://test/location".to_string(),
+ FormatVersion::V2,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ let props = metadata.table_properties().unwrap();
+
+ assert_eq!(
+ props.commit_num_retries,
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
+ );
+ assert_eq!(
+ props.write_target_file_size_bytes,
+ TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+ );
+ }
+
+ #[test]
+ fn test_table_properties_with_custom_values() {
+ use crate::spec::TableProperties;
+
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let properties = HashMap::from([
+ (
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
+ "10".to_string(),
+ ),
+ (
+
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
+ "1024".to_string(),
+ ),
+ ]);
+
+ let metadata = TableMetadataBuilder::new(
+ schema,
+ PartitionSpec::unpartition_spec().into_unbound(),
+ SortOrder::unsorted_order(),
+ "s3://test/location".to_string(),
+ FormatVersion::V2,
+ properties,
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ let props = metadata.table_properties().unwrap();
+
+ assert_eq!(props.commit_num_retries, 10);
+ assert_eq!(props.write_target_file_size_bytes, 1024);
+ }
+
+ #[test]
+ fn test_table_properties_with_invalid_value() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let properties = HashMap::from([(
+ "commit.retry.num-retries".to_string(),
+ "not_a_number".to_string(),
+ )]);
+
+ let metadata = TableMetadataBuilder::new(
+ schema,
+ PartitionSpec::unpartition_spec().into_unbound(),
+ SortOrder::unsorted_order(),
+ "s3://test/location".to_string(),
+ FormatVersion::V2,
+ properties,
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ let err = metadata.table_properties().unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::DataInvalid);
+ assert!(err.message().contains("Invalid table properties"));
+ }
}
diff --git a/crates/iceberg/src/transaction/mod.rs
b/crates/iceberg/src/transaction/mod.rs
index 8ddaa2669..074c7fefe 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -76,7 +76,7 @@ use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
use crate::transaction::update_statistics::UpdateStatisticsAction;
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
-use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement,
TableUpdate};
+use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
/// Table transaction.
#[derive(Clone)]
@@ -163,10 +163,7 @@ impl Transaction {
return Ok(self.table);
}
- let table_props =
-
TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
- Error::new(ErrorKind::DataInvalid, "Invalid table
properties").with_source(e)
- })?;
+ let table_props = self.table.metadata().table_properties()?;
let backoff = Self::build_backoff(table_props)?;
let tx = self;
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs
b/crates/integrations/datafusion/src/physical_plan/write.rs
index fdfddf877..0dea150d3 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -36,7 +36,7 @@ use datafusion::physical_plan::{
};
use futures::StreamExt;
use iceberg::arrow::FieldMatchMode;
-use iceberg::spec::{DataFileFormat, TableProperties,
serialize_data_file_to_json};
+use iceberg::spec::{DataFileFormat, serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
@@ -208,15 +208,16 @@ impl ExecutionPlan for IcebergWriteExec {
let partition_type =
self.table.metadata().default_partition_type().clone();
let format_version = self.table.metadata().format_version();
+ // Get typed table properties
+ let table_props = self
+ .table
+ .metadata()
+ .table_properties()
+ .map_err(to_datafusion_error)?;
+
// Check data file format
- let file_format = DataFileFormat::from_str(
- self.table
- .metadata()
- .properties()
- .get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT)
-
.unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
- )
- .map_err(to_datafusion_error)?;
+ let file_format =
DataFileFormat::from_str(&table_props.write_format_default)
+ .map_err(to_datafusion_error)?;
if file_format != DataFileFormat::Parquet {
return Err(to_datafusion_error(Error::new(
ErrorKind::FeatureUnsupported,
@@ -230,24 +231,7 @@ impl ExecutionPlan for IcebergWriteExec {
self.table.metadata().current_schema().clone(),
FieldMatchMode::Name,
);
- let target_file_size = match self
- .table
- .metadata()
- .properties()
- .get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
- {
- Some(value_str) => value_str
- .parse::<usize>()
- .map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- "Invalid value for write.target-file-size-bytes",
- )
- .with_source(e)
- })
- .map_err(to_datafusion_error)?,
- None =>
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
- };
+ let target_file_size = table_props.write_target_file_size_bytes;
let file_io = self.table.file_io().clone();
// todo location_gen and file_name_gen should be configurable
@@ -266,28 +250,7 @@ impl ExecutionPlan for IcebergWriteExec {
let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder);
// Create TaskWriter
- let fanout_enabled = self
- .table
- .metadata()
- .properties()
- .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
- .map(|value| {
- value
- .parse::<bool>()
- .map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Invalid value for {}, expected 'true' or
'false'",
-
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
- ),
- )
- .with_source(e)
- })
- .map_err(to_datafusion_error)
- })
- .transpose()?
-
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
+ let fanout_enabled = table_props.write_datafusion_fanout_enabled;
let schema = self.table.metadata().current_schema().clone();
let partition_spec =
self.table.metadata().default_partition_spec().clone();
let task_writer = TaskWriter::try_new(