This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 36bd7eb3 feat(spec): add `table_properties.rs` to spec (#1733)
36bd7eb3 is described below

commit 36bd7eb36828be24acf08e97e2b2ff8e8e83d6f4
Author: Kaushik Srinivasan <[email protected]>
AuthorDate: Tue Oct 14 06:16:32 2025 -0400

    feat(spec): add `table_properties.rs` to spec (#1733)
    
    ## Which issue does this PR close?
    
    
    
    - Closes #1505 .
    
    ## What changes are included in this PR?
    
    
    - Adds `table_properties.rs` to hold and validate properties and set
    default values. Uses macros to simplify setting new properties.
    
    ## Are these changes tested?
    
    Yes
---
 Cargo.toml                                         |   4 +-
 crates/iceberg/Cargo.toml                          |   2 +-
 crates/iceberg/src/spec/mod.rs                     |   2 +
 crates/iceberg/src/spec/table_metadata.rs          |  85 ------
 crates/iceberg/src/spec/table_metadata_builder.rs  |  23 +-
 crates/iceberg/src/spec/table_properties.rs        | 284 +++++++++++++++++++++
 crates/iceberg/src/transaction/mod.rs              |  69 +----
 crates/iceberg/src/transaction/snapshot.rs         |  13 +-
 .../src/writer/file_writer/rolling_writer.rs       |   4 +-
 .../datafusion/src/physical_plan/write.rs          |  14 +-
 10 files changed, 327 insertions(+), 173 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 999b9117..46c99cc3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -76,10 +76,10 @@ futures = "0.3"
 hive_metastore = "0.2.0"
 http = "1.2"
 iceberg = { version = "0.7.0", path = "./crates/iceberg" }
-iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" }
 iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" }
-iceberg-catalog-s3tables = { version = "0.7.0", path = 
"./crates/catalog/s3tables" }
 iceberg-catalog-hms = { version = "0.7.0", path = "./crates/catalog/hms" }
+iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" }
+iceberg-catalog-s3tables = { version = "0.7.0", path = 
"./crates/catalog/s3tables" }
 iceberg-datafusion = { version = "0.7.0", path = 
"./crates/integrations/datafusion" }
 indicatif = "0.17"
 itertools = "0.13"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index d592700b..b831607a 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -74,8 +74,8 @@ opendal = { workspace = true }
 ordered-float = { workspace = true }
 parquet = { workspace = true, features = ["async"] }
 rand = { workspace = true }
-reqwest = { workspace = true }
 reqsign = { version = "0.16.3", optional = true, default-features = false }
+reqwest = { workspace = true }
 roaring = { workspace = true }
 rust_decimal = { workspace = true }
 serde = { workspace = true }
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 90dafcc1..44b35e5a 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -30,6 +30,7 @@ mod sort;
 mod statistic_file;
 mod table_metadata;
 mod table_metadata_builder;
+mod table_properties;
 mod transform;
 mod values;
 mod view_metadata;
@@ -48,6 +49,7 @@ pub use snapshot_summary::*;
 pub use sort::*;
 pub use statistic_file::*;
 pub use table_metadata::*;
+pub use table_properties::*;
 pub use transform::*;
 pub use values::*;
 pub use view_metadata::*;
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index d7347d50..ca298f30 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -46,91 +46,6 @@ pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
 pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
 pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
 
-/// Reserved table property for table format version.
-///
-/// Iceberg will default a new table's format version to the latest stable and 
recommended
-/// version. This reserved property keyword allows users to override the 
Iceberg format version of
-/// the table metadata.
-///
-/// If this table property exists when creating a table, the table will use 
the specified format
-/// version. If a table updates this property, it will try to upgrade to the 
specified format
-/// version.
-pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
-/// Reserved table property for table UUID.
-pub const PROPERTY_UUID: &str = "uuid";
-/// Reserved table property for the total number of snapshots.
-pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
-/// Reserved table property for current snapshot summary.
-pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
-/// Reserved table property for current snapshot id.
-pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
-/// Reserved table property for current snapshot timestamp.
-pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = 
"current-snapshot-timestamp-ms";
-/// Reserved table property for the JSON representation of current schema.
-pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
-/// Reserved table property for the JSON representation of current(default) 
partition spec.
-pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
-/// Reserved table property for the JSON representation of current(default) 
sort order.
-pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
-
-/// Property key for max number of previous versions to keep.
-pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = 
"write.metadata.previous-versions-max";
-/// Default value for max number of previous versions to keep.
-pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
-
-/// Property key for max number of partitions to keep summary stats for.
-pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = 
"write.summary.partition-limit";
-/// Default value for the max number of partitions to keep summary stats for.
-pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
-
-/// Reserved Iceberg table properties list.
-///
-/// Reserved table properties are only used to control behaviors when creating 
or updating a
-/// table. The value of these properties are not persisted as a part of the 
table metadata.
-pub const RESERVED_PROPERTIES: [&str; 9] = [
-    PROPERTY_FORMAT_VERSION,
-    PROPERTY_UUID,
-    PROPERTY_SNAPSHOT_COUNT,
-    PROPERTY_CURRENT_SNAPSHOT_ID,
-    PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
-    PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
-    PROPERTY_CURRENT_SCHEMA,
-    PROPERTY_DEFAULT_PARTITION_SPEC,
-    PROPERTY_DEFAULT_SORT_ORDER,
-];
-
-/// Property key for number of commit retries.
-pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
-/// Default value for number of commit retries.
-pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
-
-/// Property key for minimum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
-/// Default value for minimum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
-
-/// Property key for maximum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
-/// Default value for maximum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 
minute
-
-/// Property key for total maximum retry time (ms).
-pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = 
"commit.retry.total-timeout-ms";
-/// Default value for total maximum retry time (ms).
-pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; 
// 30 minutes
-
-/// Default file format for data files
-pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
-/// Default file format for delete files
-pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = 
"write.delete.format.default";
-/// Default value for data file format
-pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
-
-/// Target file size for newly written files.
-pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = 
"write.target-file-size-bytes";
-/// Default target file size
-pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 
1024; // 512 MB
-
 /// Reference to [`TableMetadata`].
 pub type TableMetadataRef = Arc<TableMetadata>;
 
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs 
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 068f2002..7881ebea 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -22,11 +22,10 @@ use uuid::Uuid;
 
 use super::{
     DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, 
MetadataLog,
-    ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX,
-    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, PartitionSpec, 
PartitionSpecBuilder,
-    PartitionStatisticsFile, RESERVED_PROPERTIES, Schema, SchemaRef, Snapshot, 
SnapshotLog,
-    SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, 
StatisticsFile, StructType,
-    TableMetadata, UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec,
+    ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, 
PartitionStatisticsFile, Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, 
SortOrderRef,
+    StatisticsFile, StructType, TableMetadata, TableProperties, 
UNPARTITIONED_LAST_ASSIGNED_ID,
+    UnboundPartitionSpec,
 };
 use crate::error::{Error, ErrorKind, Result};
 use crate::{TableCreation, TableUpdate};
@@ -247,7 +246,7 @@ impl TableMetadataBuilder {
         // List of specified properties that are RESERVED and should not be 
persisted.
         let reserved_properties = properties
             .keys()
-            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .filter(|key| 
TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
             .map(ToString::to_string)
             .collect::<Vec<_>>();
 
@@ -285,7 +284,7 @@ impl TableMetadataBuilder {
         // disallow removal of reserved properties
         let reserved_properties = properties
             .iter()
-            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .filter(|key| 
TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
             .map(ToString::to_string)
             .collect::<Vec<_>>();
 
@@ -1061,9 +1060,9 @@ impl TableMetadataBuilder {
         let max_size = self
             .metadata
             .properties
-            .get(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
+            .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
             .and_then(|v| v.parse::<usize>().ok())
-            .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
+            
.unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
             .max(1);
 
         if self.metadata.metadata_log.len() > max_size {
@@ -1360,8 +1359,8 @@ mod tests {
     use crate::io::FileIOBuilder;
     use crate::spec::{
         BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, 
PrimitiveType, Schema,
-        SnapshotRetention, SortDirection, SortField, StructType, Summary, 
Transform, Type,
-        UnboundPartitionField,
+        SnapshotRetention, SortDirection, SortField, StructType, Summary, 
TableProperties,
+        Transform, Type, UnboundPartitionField,
     };
     use crate::table::Table;
 
@@ -2299,7 +2298,7 @@ mod tests {
         let builder = builder_without_changes(FormatVersion::V2);
         let metadata = builder
             .set_properties(HashMap::from_iter(vec![(
-                PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
+                
TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
                 "2".to_string(),
             )]))
             .unwrap()
diff --git a/crates/iceberg/src/spec/table_properties.rs 
b/crates/iceberg/src/spec/table_properties.rs
new file mode 100644
index 00000000..9aa789fe
--- /dev/null
+++ b/crates/iceberg/src/spec/table_properties.rs
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+// Helper function to parse a property from a HashMap
+// If the property is not found, use the default value
+fn parse_property<T: std::str::FromStr>(
+    properties: &HashMap<String, String>,
+    key: &str,
+    default: T,
+) -> Result<T, anyhow::Error>
+where
+    <T as std::str::FromStr>::Err: std::fmt::Display,
+{
+    properties.get(key).map_or(Ok(default), |value| {
+        value
+            .parse::<T>()
+            .map_err(|e| anyhow::anyhow!("Invalid value for {}: {}", key, e))
+    })
+}
+
+/// TableProperties that contains the properties of a table.
+#[derive(Debug)]
+pub struct TableProperties {
+    /// The number of times to retry a commit.
+    pub commit_num_retries: usize,
+    /// The minimum wait time between retries.
+    pub commit_min_retry_wait_ms: u64,
+    /// The maximum wait time between retries.
+    pub commit_max_retry_wait_ms: u64,
+    /// The total timeout for commit retries.
+    pub commit_total_retry_timeout_ms: u64,
+    /// The default format for files.
+    pub write_format_default: String,
+    /// The target file size for files.
+    pub write_target_file_size_bytes: usize,
+}
+
+impl TableProperties {
+    /// Reserved table property for table format version.
+    ///
+    /// Iceberg will default a new table's format version to the latest stable 
and recommended
+    /// version. This reserved property keyword allows users to override the 
Iceberg format version of
+    /// the table metadata.
+    ///
+    /// If this table property exists when creating a table, the table will 
use the specified format
+    /// version. If a table updates this property, it will try to upgrade to 
the specified format
+    /// version.
+    pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
+    /// Reserved table property for table UUID.
+    pub const PROPERTY_UUID: &str = "uuid";
+    /// Reserved table property for the total number of snapshots.
+    pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
+    /// Reserved table property for current snapshot summary.
+    pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = 
"current-snapshot-summary";
+    /// Reserved table property for current snapshot id.
+    pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
+    /// Reserved table property for current snapshot timestamp.
+    pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = 
"current-snapshot-timestamp-ms";
+    /// Reserved table property for the JSON representation of current schema.
+    pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
+    /// Reserved table property for the JSON representation of 
current(default) partition spec.
+    pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
+    /// Reserved table property for the JSON representation of 
current(default) sort order.
+    pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
+
+    /// Property key for max number of previous versions to keep.
+    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
+        "write.metadata.previous-versions-max";
+    /// Default value for max number of previous versions to keep.
+    pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
+
+    /// Property key for max number of partitions to keep summary stats for.
+    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = 
"write.summary.partition-limit";
+    /// Default value for the max number of partitions to keep summary stats 
for.
+    pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
+
+    /// Reserved Iceberg table properties list.
+    ///
+    /// Reserved table properties are only used to control behaviors when 
creating or updating a
+    /// table. The value of these properties are not persisted as a part of 
the table metadata.
+    pub const RESERVED_PROPERTIES: [&str; 9] = [
+        Self::PROPERTY_FORMAT_VERSION,
+        Self::PROPERTY_UUID,
+        Self::PROPERTY_SNAPSHOT_COUNT,
+        Self::PROPERTY_CURRENT_SNAPSHOT_ID,
+        Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
+        Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
+        Self::PROPERTY_CURRENT_SCHEMA,
+        Self::PROPERTY_DEFAULT_PARTITION_SPEC,
+        Self::PROPERTY_DEFAULT_SORT_ORDER,
+    ];
+
+    /// Property key for number of commit retries.
+    pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
+    /// Default value for number of commit retries.
+    pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
+
+    /// Property key for minimum wait time (ms) between retries.
+    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = 
"commit.retry.min-wait-ms";
+    /// Default value for minimum wait time (ms) between retries.
+    pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
+
+    /// Property key for maximum wait time (ms) between retries.
+    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = 
"commit.retry.max-wait-ms";
+    /// Default value for maximum wait time (ms) between retries.
+    pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 
minute
+
+    /// Property key for total maximum retry time (ms).
+    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = 
"commit.retry.total-timeout-ms";
+    /// Default value for total maximum retry time (ms).
+    pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 
1000; // 30 minutes
+
+    /// Default file format for data files
+    pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
+    /// Default file format for delete files
+    pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = 
"write.delete.format.default";
+    /// Default value for data file format
+    pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
+
+    /// Target file size for newly written files.
+    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = 
"write.target-file-size-bytes";
+    /// Default target file size
+    pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 
1024 * 1024; // 512 MB
+}
+
+impl TryFrom<&HashMap<String, String>> for TableProperties {
+    // parse by entry key or use default value
+    type Error = anyhow::Error;
+
+    fn try_from(props: &HashMap<String, String>) -> Result<Self, Self::Error> {
+        Ok(TableProperties {
+            commit_num_retries: parse_property(
+                props,
+                TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
+                TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
+            )?,
+            commit_min_retry_wait_ms: parse_property(
+                props,
+                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
+                TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+            )?,
+            commit_max_retry_wait_ms: parse_property(
+                props,
+                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
+                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+            )?,
+            commit_total_retry_timeout_ms: parse_property(
+                props,
+                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
+                TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+            )?,
+            write_format_default: parse_property(
+                props,
+                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
+                
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
+            )?,
+            write_target_file_size_bytes: parse_property(
+                props,
+                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
+                TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+            )?,
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_table_properties_default() {
+        let props = HashMap::new();
+        let table_properties = TableProperties::try_from(&props).unwrap();
+        assert_eq!(
+            table_properties.commit_num_retries,
+            TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
+        );
+        assert_eq!(
+            table_properties.commit_min_retry_wait_ms,
+            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
+        );
+        assert_eq!(
+            table_properties.commit_max_retry_wait_ms,
+            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
+        );
+        assert_eq!(
+            table_properties.write_format_default,
+            TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
+        );
+        assert_eq!(
+            table_properties.write_target_file_size_bytes,
+            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+        );
+    }
+
+    #[test]
+    fn test_table_properties_valid() {
+        let props = HashMap::from([
+            (
+                TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
+                "10".to_string(),
+            ),
+            (
+                TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
+                "20".to_string(),
+            ),
+            (
+                TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
+                "avro".to_string(),
+            ),
+            (
+                
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
+                "512".to_string(),
+            ),
+        ]);
+        let table_properties = TableProperties::try_from(&props).unwrap();
+        assert_eq!(table_properties.commit_num_retries, 10);
+        assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
+        assert_eq!(table_properties.write_format_default, "avro".to_string());
+        assert_eq!(table_properties.write_target_file_size_bytes, 512);
+    }
+
+    #[test]
+    fn test_table_properties_invalid() {
+        let invalid_retries = HashMap::from([(
+            TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
+            "abc".to_string(),
+        )]);
+
+        let table_properties = 
TableProperties::try_from(&invalid_retries).unwrap_err();
+        assert!(
+            table_properties.to_string().contains(
+                "Invalid value for commit.retry.num-retries: invalid digit 
found in string"
+            )
+        );
+
+        let invalid_min_wait = HashMap::from([(
+            TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
+            "abc".to_string(),
+        )]);
+        let table_properties = 
TableProperties::try_from(&invalid_min_wait).unwrap_err();
+        assert!(
+            table_properties.to_string().contains(
+                "Invalid value for commit.retry.min-wait-ms: invalid digit 
found in string"
+            )
+        );
+
+        let invalid_max_wait = HashMap::from([(
+            TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
+            "abc".to_string(),
+        )]);
+        let table_properties = 
TableProperties::try_from(&invalid_max_wait).unwrap_err();
+        assert!(
+            table_properties.to_string().contains(
+                "Invalid value for commit.retry.max-wait-ms: invalid digit 
found in string"
+            )
+        );
+
+        let invalid_target_size = HashMap::from([(
+            TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
+            "abc".to_string(),
+        )]);
+        let table_properties = 
TableProperties::try_from(&invalid_target_size).unwrap_err();
+        assert!(table_properties.to_string().contains(
+            "Invalid value for write.target-file-size-bytes: invalid digit 
found in string"
+        ));
+    }
+}
diff --git a/crates/iceberg/src/transaction/mod.rs 
b/crates/iceberg/src/transaction/mod.rs
index 06549a95..26bd6522 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -52,8 +52,6 @@
 /// that allows users to apply a transaction action to a `Transaction`.
 mod action;
 
-use std::collections::HashMap;
-
 pub use action::*;
 mod append;
 mod snapshot;
@@ -69,12 +67,7 @@ use std::time::Duration;
 use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, 
RetryableWithContext};
 
 use crate::error::Result;
-use crate::spec::{
-    PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, 
PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
-    PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, 
PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
-    PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
-    PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, 
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
-};
+use crate::spec::TableProperties;
 use crate::table::Table;
 use crate::transaction::action::BoxedTransactionAction;
 use crate::transaction::append::FastAppendAction;
@@ -170,7 +163,12 @@ impl Transaction {
             return Ok(self.table);
         }
 
-        let backoff = Self::build_backoff(self.table.metadata().properties())?;
+        let table_props =
+            
TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
+                Error::new(ErrorKind::DataInvalid, "Invalid table 
properties").with_source(e)
+            })?;
+
+        let backoff = Self::build_backoff(table_props)?;
         let tx = self;
 
         (|mut tx: Transaction| async {
@@ -185,53 +183,14 @@ impl Transaction {
         .1
     }
 
-    fn build_backoff(props: &HashMap<String, String>) -> 
Result<ExponentialBackoff> {
-        let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
-            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    "Invalid value for commit.retry.min-wait-ms",
-                )
-                .with_source(e)
-            })?,
-            None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
-        };
-        let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
-            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    "Invalid value for commit.retry.max-wait-ms",
-                )
-                .with_source(e)
-            })?,
-            None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
-        };
-        let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) 
{
-            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    "Invalid value for commit.retry.total-timeout-ms",
-                )
-                .with_source(e)
-            })?,
-            None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
-        };
-        let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
-            Some(value_str) => value_str.parse::<usize>().map_err(|e| {
-                Error::new(
-                    ErrorKind::DataInvalid,
-                    "Invalid value for commit.retry.num-retries",
-                )
-                .with_source(e)
-            })?,
-            None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
-        };
-
+    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
         Ok(ExponentialBuilder::new()
-            .with_min_delay(Duration::from_millis(min_delay))
-            .with_max_delay(Duration::from_millis(max_delay))
-            .with_total_delay(Some(Duration::from_millis(total_delay)))
-            .with_max_times(max_times)
+            
.with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
+            
.with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
+            .with_total_delay(Some(Duration::from_millis(
+                props.commit_total_retry_timeout_ms,
+            )))
+            .with_max_times(props.commit_num_retries)
             .with_factor(2.0)
             .build())
     }
diff --git a/crates/iceberg/src/transaction/snapshot.rs 
b/crates/iceberg/src/transaction/snapshot.rs
index 48dc2b5b..a03b8dc4 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -24,10 +24,9 @@ use uuid::Uuid;
 use crate::error::Result;
 use crate::spec::{
     DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, 
ManifestEntry,
-    ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, 
Operation,
-    PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, 
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
-    Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, 
Struct, StructType,
-    Summary, update_snapshot_summaries,
+    ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, 
Operation, Snapshot,
+    SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, 
StructType, Summary,
+    TableProperties, update_snapshot_summaries,
 };
 use crate::table::Table;
 use crate::transaction::ActionCommit;
@@ -322,15 +321,15 @@ impl<'a> SnapshotProducer<'a> {
 
         let partition_summary_limit = if let Some(limit) = table_metadata
             .properties()
-            .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
+            .get(TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
         {
             if let Ok(limit) = limit.parse::<u64>() {
                 limit
             } else {
-                PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+                TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
             }
         } else {
-            PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+            TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
         };
 
         summary_collector.set_partition_summary_limit(partition_summary_limit);
diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs 
b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
index 0b9b105c..00036170 100644
--- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
@@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter};
 use arrow_array::RecordBatch;
 
 use crate::io::{FileIO, OutputFile};
-use crate::spec::{DataFileBuilder, 
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey};
+use crate::spec::{DataFileBuilder, PartitionKey, TableProperties};
 use crate::writer::CurrentFileStatus;
 use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
 use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
@@ -95,7 +95,7 @@ where
     ) -> Self {
         Self {
             inner_builder,
-            target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+            target_file_size: 
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
             file_io,
             location_generator,
             file_name_generator,
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs 
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 759f1a8d..7bff2392 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -36,11 +36,7 @@ use datafusion::physical_plan::{
 };
 use futures::StreamExt;
 use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
-use iceberg::spec::{
-    DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, 
PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT,
-    PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, 
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
-    serialize_data_file_to_json,
-};
+use iceberg::spec::{DataFileFormat, TableProperties, 
serialize_data_file_to_json};
 use iceberg::table::Table;
 use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
 use iceberg::writer::file_writer::ParquetWriterBuilder;
@@ -226,8 +222,8 @@ impl ExecutionPlan for IcebergWriteExec {
             self.table
                 .metadata()
                 .properties()
-                .get(PROPERTY_DEFAULT_FILE_FORMAT)
-                .unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
+                .get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT)
+                
.unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
         )
         .map_err(to_datafusion_error)?;
         if file_format != DataFileFormat::Parquet {
@@ -250,7 +246,7 @@ impl ExecutionPlan for IcebergWriteExec {
             .table
             .metadata()
             .properties()
-            .get(PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
+            .get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
         {
             Some(value_str) => value_str
                 .parse::<usize>()
@@ -262,7 +258,7 @@ impl ExecutionPlan for IcebergWriteExec {
                     .with_source(e)
                 })
                 .map_err(to_datafusion_error)?,
-            None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+            None => 
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
         };
 
         let file_io = self.table.file_io().clone();

Reply via email to