This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 9844638c4 feat: make FanoutWriter writer configurable (#1962)
9844638c4 is described below
commit 9844638c48e2d4b123c9df4f89330cfd1e8bfcba
Author: Alan Tang <[email protected]>
AuthorDate: Tue Jan 6 10:43:11 2026 +0800
feat: make FanoutWriter writer configurable (#1962)
## Which issue does this PR close?
- Closes #1834.
## What changes are included in this PR?
- Fellow on #1872.
## Are these changes tested?
---------
Signed-off-by: StandingMan <[email protected]>
---
crates/iceberg/src/spec/table_properties.rs | 12 +++++++++++
.../datafusion/src/physical_plan/write.rs | 24 ++++++++++++++++++++--
2 files changed, 34 insertions(+), 2 deletions(-)
diff --git a/crates/iceberg/src/spec/table_properties.rs
b/crates/iceberg/src/spec/table_properties.rs
index 497545601..413604f51 100644
--- a/crates/iceberg/src/spec/table_properties.rs
+++ b/crates/iceberg/src/spec/table_properties.rs
@@ -49,6 +49,8 @@ pub struct TableProperties {
pub write_format_default: String,
/// The target file size for files.
pub write_target_file_size_bytes: usize,
+ /// Whether to use `FanoutWriter` for partitioned tables.
+ pub write_datafusion_fanout_enabled: bool,
}
impl TableProperties {
@@ -137,6 +139,11 @@ impl TableProperties {
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str =
"write.target-file-size-bytes";
/// Default target file size
pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 *
1024 * 1024; // 512 MB
+ /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted
data).
+ /// If false, uses `ClusteredWriter` (requires sorted data, more memory
efficient).
+ pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str =
"write.datafusion.fanout.enabled";
+ /// Default value for fanout writer enabled
+ pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true;
}
impl TryFrom<&HashMap<String, String>> for TableProperties {
@@ -175,6 +182,11 @@ impl TryFrom<&HashMap<String, String>> for TableProperties
{
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)?,
+ write_datafusion_fanout_enabled: parse_property(
+ props,
+ TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED,
+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT,
+ )?,
})
}
}
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 9eb53c235..fdfddf877 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -266,8 +266,28 @@ impl ExecutionPlan for IcebergWriteExec {
let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder);
// Create TaskWriter
- // TODO: Make fanout_enabled configurable via table properties
- let fanout_enabled = true;
+ let fanout_enabled = self
+ .table
+ .metadata()
+ .properties()
+ .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
+ .map(|value| {
+ value
+ .parse::<bool>()
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid value for {}, expected 'true' or
'false'",
+
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
+ ),
+ )
+ .with_source(e)
+ })
+ .map_err(to_datafusion_error)
+ })
+ .transpose()?
+
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
let schema = self.table.metadata().current_schema().clone();
let partition_spec =
self.table.metadata().default_partition_spec().clone();
let task_writer = TaskWriter::try_new(