This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 36bd7eb3 feat(spec): add `table_properties.rs` to spec (#1733)
36bd7eb3 is described below
commit 36bd7eb36828be24acf08e97e2b2ff8e8e83d6f4
Author: Kaushik Srinivasan <[email protected]>
AuthorDate: Tue Oct 14 06:16:32 2025 -0400
feat(spec): add `table_properties.rs` to spec (#1733)
## Which issue does this PR close?
- Closes #1505 .
## What changes are included in this PR?
- Adds `table_properties.rs` to hold and validate properties and set
default values. Uses macros to simplify setting new properties.
## Are these changes tested?
Yes
---
Cargo.toml | 4 +-
crates/iceberg/Cargo.toml | 2 +-
crates/iceberg/src/spec/mod.rs | 2 +
crates/iceberg/src/spec/table_metadata.rs | 85 ------
crates/iceberg/src/spec/table_metadata_builder.rs | 23 +-
crates/iceberg/src/spec/table_properties.rs | 284 +++++++++++++++++++++
crates/iceberg/src/transaction/mod.rs | 69 +----
crates/iceberg/src/transaction/snapshot.rs | 13 +-
.../src/writer/file_writer/rolling_writer.rs | 4 +-
.../datafusion/src/physical_plan/write.rs | 14 +-
10 files changed, 327 insertions(+), 173 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 999b9117..46c99cc3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -76,10 +76,10 @@ futures = "0.3"
hive_metastore = "0.2.0"
http = "1.2"
iceberg = { version = "0.7.0", path = "./crates/iceberg" }
-iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" }
iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" }
-iceberg-catalog-s3tables = { version = "0.7.0", path =
"./crates/catalog/s3tables" }
iceberg-catalog-hms = { version = "0.7.0", path = "./crates/catalog/hms" }
+iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" }
+iceberg-catalog-s3tables = { version = "0.7.0", path =
"./crates/catalog/s3tables" }
iceberg-datafusion = { version = "0.7.0", path =
"./crates/integrations/datafusion" }
indicatif = "0.17"
itertools = "0.13"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index d592700b..b831607a 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -74,8 +74,8 @@ opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
rand = { workspace = true }
-reqwest = { workspace = true }
reqsign = { version = "0.16.3", optional = true, default-features = false }
+reqwest = { workspace = true }
roaring = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 90dafcc1..44b35e5a 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -30,6 +30,7 @@ mod sort;
mod statistic_file;
mod table_metadata;
mod table_metadata_builder;
+mod table_properties;
mod transform;
mod values;
mod view_metadata;
@@ -48,6 +49,7 @@ pub use snapshot_summary::*;
pub use sort::*;
pub use statistic_file::*;
pub use table_metadata::*;
+pub use table_properties::*;
pub use transform::*;
pub use values::*;
pub use view_metadata::*;
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index d7347d50..ca298f30 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -46,91 +46,6 @@ pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
-/// Reserved table property for table format version.
-///
-/// Iceberg will default a new table's format version to the latest stable and
recommended
-/// version. This reserved property keyword allows users to override the
Iceberg format version of
-/// the table metadata.
-///
-/// If this table property exists when creating a table, the table will use
the specified format
-/// version. If a table updates this property, it will try to upgrade to the
specified format
-/// version.
-pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
-/// Reserved table property for table UUID.
-pub const PROPERTY_UUID: &str = "uuid";
-/// Reserved table property for the total number of snapshots.
-pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
-/// Reserved table property for current snapshot summary.
-pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary";
-/// Reserved table property for current snapshot id.
-pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
-/// Reserved table property for current snapshot timestamp.
-pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str =
"current-snapshot-timestamp-ms";
-/// Reserved table property for the JSON representation of current schema.
-pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
-/// Reserved table property for the JSON representation of current(default)
partition spec.
-pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
-/// Reserved table property for the JSON representation of current(default)
sort order.
-pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
-
-/// Property key for max number of previous versions to keep.
-pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
"write.metadata.previous-versions-max";
-/// Default value for max number of previous versions to keep.
-pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
-
-/// Property key for max number of partitions to keep summary stats for.
-pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str =
"write.summary.partition-limit";
-/// Default value for the max number of partitions to keep summary stats for.
-pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
-
-/// Reserved Iceberg table properties list.
-///
-/// Reserved table properties are only used to control behaviors when creating
or updating a
-/// table. The value of these properties are not persisted as a part of the
table metadata.
-pub const RESERVED_PROPERTIES: [&str; 9] = [
- PROPERTY_FORMAT_VERSION,
- PROPERTY_UUID,
- PROPERTY_SNAPSHOT_COUNT,
- PROPERTY_CURRENT_SNAPSHOT_ID,
- PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
- PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
- PROPERTY_CURRENT_SCHEMA,
- PROPERTY_DEFAULT_PARTITION_SPEC,
- PROPERTY_DEFAULT_SORT_ORDER,
-];
-
-/// Property key for number of commit retries.
-pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
-/// Default value for number of commit retries.
-pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
-
-/// Property key for minimum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
-/// Default value for minimum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
-
-/// Property key for maximum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
-/// Default value for maximum wait time (ms) between retries.
-pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1
minute
-
-/// Property key for total maximum retry time (ms).
-pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str =
"commit.retry.total-timeout-ms";
-/// Default value for total maximum retry time (ms).
-pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000;
// 30 minutes
-
-/// Default file format for data files
-pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
-/// Default file format for delete files
-pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str =
"write.delete.format.default";
-/// Default value for data file format
-pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
-
-/// Target file size for newly written files.
-pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str =
"write.target-file-size-bytes";
-/// Default target file size
-pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 *
1024; // 512 MB
-
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs
b/crates/iceberg/src/spec/table_metadata_builder.rs
index 068f2002..7881ebea 100644
--- a/crates/iceberg/src/spec/table_metadata_builder.rs
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -22,11 +22,10 @@ use uuid::Uuid;
use super::{
DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH,
MetadataLog,
- ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX,
- PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, PartitionSpec,
PartitionSpecBuilder,
- PartitionStatisticsFile, RESERVED_PROPERTIES, Schema, SchemaRef, Snapshot,
SnapshotLog,
- SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef,
StatisticsFile, StructType,
- TableMetadata, UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec,
+ ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder,
PartitionStatisticsFile, Schema, SchemaRef,
+ Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder,
SortOrderRef,
+ StatisticsFile, StructType, TableMetadata, TableProperties,
UNPARTITIONED_LAST_ASSIGNED_ID,
+ UnboundPartitionSpec,
};
use crate::error::{Error, ErrorKind, Result};
use crate::{TableCreation, TableUpdate};
@@ -247,7 +246,7 @@ impl TableMetadataBuilder {
// List of specified properties that are RESERVED and should not be
persisted.
let reserved_properties = properties
.keys()
- .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+ .filter(|key|
TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
.map(ToString::to_string)
.collect::<Vec<_>>();
@@ -285,7 +284,7 @@ impl TableMetadataBuilder {
// disallow removal of reserved properties
let reserved_properties = properties
.iter()
- .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+ .filter(|key|
TableProperties::RESERVED_PROPERTIES.contains(&key.as_str()))
.map(ToString::to_string)
.collect::<Vec<_>>();
@@ -1061,9 +1060,9 @@ impl TableMetadataBuilder {
let max_size = self
.metadata
.properties
- .get(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
+ .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
.and_then(|v| v.parse::<usize>().ok())
- .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
+
.unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
.max(1);
if self.metadata.metadata_log.len() > max_size {
@@ -1360,8 +1359,8 @@ mod tests {
use crate::io::FileIOBuilder;
use crate::spec::{
BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec,
PrimitiveType, Schema,
- SnapshotRetention, SortDirection, SortField, StructType, Summary,
Transform, Type,
- UnboundPartitionField,
+ SnapshotRetention, SortDirection, SortField, StructType, Summary,
TableProperties,
+ Transform, Type, UnboundPartitionField,
};
use crate::table::Table;
@@ -2299,7 +2298,7 @@ mod tests {
let builder = builder_without_changes(FormatVersion::V2);
let metadata = builder
.set_properties(HashMap::from_iter(vec![(
- PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
+
TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
"2".to_string(),
)]))
.unwrap()
diff --git a/crates/iceberg/src/spec/table_properties.rs
b/crates/iceberg/src/spec/table_properties.rs
new file mode 100644
index 00000000..9aa789fe
--- /dev/null
+++ b/crates/iceberg/src/spec/table_properties.rs
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+// Helper function to parse a property from a HashMap
+// If the property is not found, use the default value
+fn parse_property<T: std::str::FromStr>(
+ properties: &HashMap<String, String>,
+ key: &str,
+ default: T,
+) -> Result<T, anyhow::Error>
+where
+ <T as std::str::FromStr>::Err: std::fmt::Display,
+{
+ properties.get(key).map_or(Ok(default), |value| {
+ value
+ .parse::<T>()
+ .map_err(|e| anyhow::anyhow!("Invalid value for {}: {}", key, e))
+ })
+}
+
+/// TableProperties that contains the properties of a table.
+#[derive(Debug)]
+pub struct TableProperties {
+ /// The number of times to retry a commit.
+ pub commit_num_retries: usize,
+ /// The minimum wait time between retries.
+ pub commit_min_retry_wait_ms: u64,
+ /// The maximum wait time between retries.
+ pub commit_max_retry_wait_ms: u64,
+ /// The total timeout for commit retries.
+ pub commit_total_retry_timeout_ms: u64,
+ /// The default format for files.
+ pub write_format_default: String,
+ /// The target file size for files.
+ pub write_target_file_size_bytes: usize,
+}
+
+impl TableProperties {
+ /// Reserved table property for table format version.
+ ///
+ /// Iceberg will default a new table's format version to the latest stable
and recommended
+ /// version. This reserved property keyword allows users to override the
Iceberg format version of
+ /// the table metadata.
+ ///
+ /// If this table property exists when creating a table, the table will
use the specified format
+ /// version. If a table updates this property, it will try to upgrade to
the specified format
+ /// version.
+ pub const PROPERTY_FORMAT_VERSION: &str = "format-version";
+ /// Reserved table property for table UUID.
+ pub const PROPERTY_UUID: &str = "uuid";
+ /// Reserved table property for the total number of snapshots.
+ pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count";
+ /// Reserved table property for current snapshot summary.
+ pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str =
"current-snapshot-summary";
+ /// Reserved table property for current snapshot id.
+ pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id";
+ /// Reserved table property for current snapshot timestamp.
+ pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str =
"current-snapshot-timestamp-ms";
+ /// Reserved table property for the JSON representation of current schema.
+ pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema";
+ /// Reserved table property for the JSON representation of
current(default) partition spec.
+ pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec";
+ /// Reserved table property for the JSON representation of
current(default) sort order.
+ pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order";
+
+ /// Property key for max number of previous versions to keep.
+ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str =
+ "write.metadata.previous-versions-max";
+ /// Default value for max number of previous versions to keep.
+ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100;
+
+ /// Property key for max number of partitions to keep summary stats for.
+ pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str =
"write.summary.partition-limit";
+ /// Default value for the max number of partitions to keep summary stats
for.
+ pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0;
+
+ /// Reserved Iceberg table properties list.
+ ///
+ /// Reserved table properties are only used to control behaviors when
creating or updating a
+ /// table. The value of these properties are not persisted as a part of
the table metadata.
+ pub const RESERVED_PROPERTIES: [&str; 9] = [
+ Self::PROPERTY_FORMAT_VERSION,
+ Self::PROPERTY_UUID,
+ Self::PROPERTY_SNAPSHOT_COUNT,
+ Self::PROPERTY_CURRENT_SNAPSHOT_ID,
+ Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY,
+ Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP,
+ Self::PROPERTY_CURRENT_SCHEMA,
+ Self::PROPERTY_DEFAULT_PARTITION_SPEC,
+ Self::PROPERTY_DEFAULT_SORT_ORDER,
+ ];
+
+ /// Property key for number of commit retries.
+ pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
+ /// Default value for number of commit retries.
+ pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
+
+ /// Property key for minimum wait time (ms) between retries.
+ pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str =
"commit.retry.min-wait-ms";
+ /// Default value for minimum wait time (ms) between retries.
+ pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
+
+ /// Property key for maximum wait time (ms) between retries.
+ pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str =
"commit.retry.max-wait-ms";
+ /// Default value for maximum wait time (ms) between retries.
+ pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1
minute
+
+ /// Property key for total maximum retry time (ms).
+ pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str =
"commit.retry.total-timeout-ms";
+ /// Default value for total maximum retry time (ms).
+ pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 *
1000; // 30 minutes
+
+ /// Default file format for data files
+ pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default";
+ /// Default file format for delete files
+ pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str =
"write.delete.format.default";
+ /// Default value for data file format
+ pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet";
+
+ /// Target file size for newly written files.
+ pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str =
"write.target-file-size-bytes";
+ /// Default target file size
+ pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 *
1024 * 1024; // 512 MB
+}
+
+impl TryFrom<&HashMap<String, String>> for TableProperties {
+ // parse by entry key or use default value
+ type Error = anyhow::Error;
+
+ fn try_from(props: &HashMap<String, String>) -> Result<Self, Self::Error> {
+ Ok(TableProperties {
+ commit_num_retries: parse_property(
+ props,
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES,
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
+ )?,
+ commit_min_retry_wait_ms: parse_property(
+ props,
+ TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
+ TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+ )?,
+ commit_max_retry_wait_ms: parse_property(
+ props,
+ TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
+ TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+ )?,
+ commit_total_retry_timeout_ms: parse_property(
+ props,
+ TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
+ TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+ )?,
+ write_format_default: parse_property(
+ props,
+ TableProperties::PROPERTY_DEFAULT_FILE_FORMAT,
+
TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(),
+ )?,
+ write_target_file_size_bytes: parse_property(
+ props,
+ TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ )?,
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_table_properties_default() {
+ let props = HashMap::new();
+ let table_properties = TableProperties::try_from(&props).unwrap();
+ assert_eq!(
+ table_properties.commit_num_retries,
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT
+ );
+ assert_eq!(
+ table_properties.commit_min_retry_wait_ms,
+ TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
+ );
+ assert_eq!(
+ table_properties.commit_max_retry_wait_ms,
+ TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
+ );
+ assert_eq!(
+ table_properties.write_format_default,
+ TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()
+ );
+ assert_eq!(
+ table_properties.write_target_file_size_bytes,
+ TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT
+ );
+ }
+
+ #[test]
+ fn test_table_properties_valid() {
+ let props = HashMap::from([
+ (
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
+ "10".to_string(),
+ ),
+ (
+ TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
+ "20".to_string(),
+ ),
+ (
+ TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(),
+ "avro".to_string(),
+ ),
+ (
+
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
+ "512".to_string(),
+ ),
+ ]);
+ let table_properties = TableProperties::try_from(&props).unwrap();
+ assert_eq!(table_properties.commit_num_retries, 10);
+ assert_eq!(table_properties.commit_max_retry_wait_ms, 20);
+ assert_eq!(table_properties.write_format_default, "avro".to_string());
+ assert_eq!(table_properties.write_target_file_size_bytes, 512);
+ }
+
+ #[test]
+ fn test_table_properties_invalid() {
+ let invalid_retries = HashMap::from([(
+ TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(),
+ "abc".to_string(),
+ )]);
+
+ let table_properties =
TableProperties::try_from(&invalid_retries).unwrap_err();
+ assert!(
+ table_properties.to_string().contains(
+ "Invalid value for commit.retry.num-retries: invalid digit
found in string"
+ )
+ );
+
+ let invalid_min_wait = HashMap::from([(
+ TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(),
+ "abc".to_string(),
+ )]);
+ let table_properties =
TableProperties::try_from(&invalid_min_wait).unwrap_err();
+ assert!(
+ table_properties.to_string().contains(
+ "Invalid value for commit.retry.min-wait-ms: invalid digit
found in string"
+ )
+ );
+
+ let invalid_max_wait = HashMap::from([(
+ TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(),
+ "abc".to_string(),
+ )]);
+ let table_properties =
TableProperties::try_from(&invalid_max_wait).unwrap_err();
+ assert!(
+ table_properties.to_string().contains(
+ "Invalid value for commit.retry.max-wait-ms: invalid digit
found in string"
+ )
+ );
+
+ let invalid_target_size = HashMap::from([(
+ TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(),
+ "abc".to_string(),
+ )]);
+ let table_properties =
TableProperties::try_from(&invalid_target_size).unwrap_err();
+ assert!(table_properties.to_string().contains(
+ "Invalid value for write.target-file-size-bytes: invalid digit
found in string"
+ ));
+ }
+}
diff --git a/crates/iceberg/src/transaction/mod.rs
b/crates/iceberg/src/transaction/mod.rs
index 06549a95..26bd6522 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -52,8 +52,6 @@
/// that allows users to apply a transaction action to a `Transaction`.
mod action;
-use std::collections::HashMap;
-
pub use action::*;
mod append;
mod snapshot;
@@ -69,12 +67,7 @@ use std::time::Duration;
use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder,
RetryableWithContext};
use crate::error::Result;
-use crate::spec::{
- PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
- PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
- PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
- PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
-};
+use crate::spec::TableProperties;
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
@@ -170,7 +163,12 @@ impl Transaction {
return Ok(self.table);
}
- let backoff = Self::build_backoff(self.table.metadata().properties())?;
+ let table_props =
+
TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
+ Error::new(ErrorKind::DataInvalid, "Invalid table
properties").with_source(e)
+ })?;
+
+ let backoff = Self::build_backoff(table_props)?;
let tx = self;
(|mut tx: Transaction| async {
@@ -185,53 +183,14 @@ impl Transaction {
.1
}
- fn build_backoff(props: &HashMap<String, String>) ->
Result<ExponentialBackoff> {
- let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
- Some(value_str) => value_str.parse::<u64>().map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- "Invalid value for commit.retry.min-wait-ms",
- )
- .with_source(e)
- })?,
- None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
- };
- let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
- Some(value_str) => value_str.parse::<u64>().map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- "Invalid value for commit.retry.max-wait-ms",
- )
- .with_source(e)
- })?,
- None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
- };
- let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS)
{
- Some(value_str) => value_str.parse::<u64>().map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- "Invalid value for commit.retry.total-timeout-ms",
- )
- .with_source(e)
- })?,
- None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
- };
- let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
- Some(value_str) => value_str.parse::<usize>().map_err(|e| {
- Error::new(
- ErrorKind::DataInvalid,
- "Invalid value for commit.retry.num-retries",
- )
- .with_source(e)
- })?,
- None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
- };
-
+ fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
Ok(ExponentialBuilder::new()
- .with_min_delay(Duration::from_millis(min_delay))
- .with_max_delay(Duration::from_millis(max_delay))
- .with_total_delay(Some(Duration::from_millis(total_delay)))
- .with_max_times(max_times)
+
.with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
+
.with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
+ .with_total_delay(Some(Duration::from_millis(
+ props.commit_total_retry_timeout_ms,
+ )))
+ .with_max_times(props.commit_num_retries)
.with_factor(2.0)
.build())
}
diff --git a/crates/iceberg/src/transaction/snapshot.rs
b/crates/iceberg/src/transaction/snapshot.rs
index 48dc2b5b..a03b8dc4 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -24,10 +24,9 @@ use uuid::Uuid;
use crate::error::Result;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType,
ManifestEntry,
- ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder,
Operation,
- PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT,
PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT,
- Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector,
Struct, StructType,
- Summary, update_snapshot_summaries,
+ ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder,
Operation, Snapshot,
+ SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct,
StructType, Summary,
+ TableProperties, update_snapshot_summaries,
};
use crate::table::Table;
use crate::transaction::ActionCommit;
@@ -322,15 +321,15 @@ impl<'a> SnapshotProducer<'a> {
let partition_summary_limit = if let Some(limit) = table_metadata
.properties()
- .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
+ .get(TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT)
{
if let Ok(limit) = limit.parse::<u64>() {
limit
} else {
- PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+ TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
}
} else {
- PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
+ TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
};
summary_collector.set_partition_summary_limit(partition_summary_limit);
diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
index 0b9b105c..00036170 100644
--- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
@@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter};
use arrow_array::RecordBatch;
use crate::io::{FileIO, OutputFile};
-use crate::spec::{DataFileBuilder,
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey};
+use crate::spec::{DataFileBuilder, PartitionKey, TableProperties};
use crate::writer::CurrentFileStatus;
use crate::writer::file_writer::location_generator::{FileNameGenerator,
LocationGenerator};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
@@ -95,7 +95,7 @@ where
) -> Self {
Self {
inner_builder,
- target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ target_file_size:
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
file_io,
location_generator,
file_name_generator,
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 759f1a8d..7bff2392 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -36,11 +36,7 @@ use datafusion::physical_plan::{
};
use futures::StreamExt;
use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
-use iceberg::spec::{
- DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT,
PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT,
- PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
- serialize_data_file_to_json,
-};
+use iceberg::spec::{DataFileFormat, TableProperties,
serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
@@ -226,8 +222,8 @@ impl ExecutionPlan for IcebergWriteExec {
self.table
.metadata()
.properties()
- .get(PROPERTY_DEFAULT_FILE_FORMAT)
- .unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
+ .get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT)
+
.unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()),
)
.map_err(to_datafusion_error)?;
if file_format != DataFileFormat::Parquet {
@@ -250,7 +246,7 @@ impl ExecutionPlan for IcebergWriteExec {
.table
.metadata()
.properties()
- .get(PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
+ .get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES)
{
Some(value_str) => value_str
.parse::<usize>()
@@ -262,7 +258,7 @@ impl ExecutionPlan for IcebergWriteExec {
.with_source(e)
})
.map_err(to_datafusion_error)?,
- None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ None =>
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
};
let file_io = self.table.file_io().clone();