This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3d47be53f refactor(writer): Make writer builders non-consuming in
build (#1889)
3d47be53f is described below
commit 3d47be53fbb2486055d83524a0804079cf973c27
Author: Leonz <[email protected]>
AuthorDate: Thu Dec 4 13:11:54 2025 +0800
refactor(writer): Make writer builders non-consuming in build (#1889)
## Which issue does this PR close?
- Closes #1753.
- Related
https://github.com/apache/iceberg-rust/pull/1735#discussion_r2428605135
## What changes are included in this PR?
This change allows users to reuse builder instances without cloning when
creating multiple writers with the same configuration.
Modification non-consuming self in build function:
- `IcebergWriterBuilder`
- `RollingFileWriterBuilder`
- `FileWriterBuilder`
## Are these changes tested?
---
crates/iceberg/src/writer/base_writer/data_file_writer.rs | 6 +++---
.../iceberg/src/writer/base_writer/equality_delete_writer.rs | 12 ++++++------
crates/iceberg/src/writer/file_writer/location_generator.rs | 4 ++--
crates/iceberg/src/writer/file_writer/mod.rs | 4 ++--
crates/iceberg/src/writer/file_writer/parquet_writer.rs | 4 ++--
crates/iceberg/src/writer/file_writer/rolling_writer.rs | 12 +++++-------
crates/iceberg/src/writer/mod.rs | 8 +++-----
crates/iceberg/src/writer/partitioning/clustered_writer.rs | 1 -
crates/iceberg/src/writer/partitioning/fanout_writer.rs | 1 -
.../iceberg/src/writer/partitioning/unpartitioned_writer.rs | 2 +-
10 files changed, 24 insertions(+), 30 deletions(-)
diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
index dcaa56cc9..cb7bd172e 100644
--- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -27,7 +27,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter,
IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};
/// Builder for `DataFileWriter`.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator,
F: FileNameGenerator> {
inner: RollingFileWriterBuilder<B, L, F>,
}
@@ -53,9 +53,9 @@ where
{
type R = DataFileWriter<B, L, F>;
- async fn build(self, partition_key: Option<PartitionKey>) ->
Result<Self::R> {
+ async fn build(&self, partition_key: Option<PartitionKey>) ->
Result<Self::R> {
Ok(DataFileWriter {
- inner: Some(self.inner.clone().build()),
+ inner: Some(self.inner.build()),
partition_key,
})
}
diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
index 664ea8433..cd0b19148 100644
--- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
@@ -34,7 +34,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};
/// Builder for `EqualityDeleteWriter`.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
pub struct EqualityDeleteFileWriterBuilder<
B: FileWriterBuilder,
L: LocationGenerator,
@@ -60,7 +60,7 @@ where
}
/// Config for `EqualityDeleteWriter`.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
pub struct EqualityDeleteWriterConfig {
// Field ids used to determine row equality in equality delete files.
equality_ids: Vec<i32>,
@@ -123,11 +123,11 @@ where
{
type R = EqualityDeleteFileWriter<B, L, F>;
- async fn build(self, partition_key: Option<PartitionKey>) ->
Result<Self::R> {
+ async fn build(&self, partition_key: Option<PartitionKey>) ->
Result<Self::R> {
Ok(EqualityDeleteFileWriter {
- inner: Some(self.inner.clone().build()),
- projector: self.config.projector,
- equality_ids: self.config.equality_ids,
+ inner: Some(self.inner.build()),
+ projector: self.config.projector.clone(),
+ equality_ids: self.config.equality_ids.clone(),
partition_key,
})
}
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs
b/crates/iceberg/src/writer/file_writer/location_generator.rs
index a5cfc2829..0ad4d91ac 100644
--- a/crates/iceberg/src/writer/file_writer/location_generator.rs
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -24,7 +24,7 @@ use crate::Result;
use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
/// `LocationGenerator` used to generate the location of data file.
-pub trait LocationGenerator: Clone + Send + 'static {
+pub trait LocationGenerator: Clone + Send + Sync + 'static {
/// Generate an absolute path for the given file name that includes the
partition path.
///
/// # Arguments
@@ -94,7 +94,7 @@ impl LocationGenerator for DefaultLocationGenerator {
}
/// `FileNameGeneratorTrait` used to generate file name for data file. The
file name can be passed to `LocationGenerator` to generate the location of the
file.
-pub trait FileNameGenerator: Clone + Send + 'static {
+pub trait FileNameGenerator: Clone + Send + Sync + 'static {
/// Generate a file name.
fn generate_file_name(&self) -> String;
}
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs
b/crates/iceberg/src/writer/file_writer/mod.rs
index 2ed6414ce..101919f5b 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -36,11 +36,11 @@ pub mod rolling_writer;
type DefaultOutput = Vec<DataFileBuilder>;
/// File writer builder trait.
-pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
+pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
/// The associated file writer type.
type R: FileWriter<O>;
/// Build file writer.
- fn build(self, output_file: OutputFile) -> impl Future<Output =
Result<Self::R>> + Send;
+ fn build(&self, output_file: OutputFile) -> impl Future<Output =
Result<Self::R>> + Send;
}
/// File writer focus on writing record batch to different physical file
format.(Such as parquet. orc)
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 3e9d1715c..5cf031a9f 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -81,11 +81,11 @@ impl ParquetWriterBuilder {
impl FileWriterBuilder for ParquetWriterBuilder {
type R = ParquetWriter;
- async fn build(self, output_file: OutputFile) -> Result<Self::R> {
+ async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
Ok(ParquetWriter {
schema: self.schema.clone(),
inner_writer: None,
- writer_properties: self.props,
+ writer_properties: self.props.clone(),
current_row_num: 0,
output_file,
nan_value_count_visitor:
NanValueCountVisitor::new_with_match_mode(self.match_mode),
diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
index 8f0365478..06246ab66 100644
--- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
@@ -103,15 +103,15 @@ where
}
/// Build a new [`RollingFileWriter`].
- pub fn build(self) -> RollingFileWriter<B, L, F> {
+ pub fn build(&self) -> RollingFileWriter<B, L, F> {
RollingFileWriter {
inner: None,
- inner_builder: self.inner_builder,
+ inner_builder: self.inner_builder.clone(),
target_file_size: self.target_file_size,
data_file_builders: vec![],
- file_io: self.file_io,
- location_generator: self.location_generator,
- file_name_generator: self.file_name_generator,
+ file_io: self.file_io.clone(),
+ location_generator: self.location_generator.clone(),
+ file_name_generator: self.file_name_generator.clone(),
}
}
}
@@ -192,7 +192,6 @@ where
// initialize inner writer
self.inner = Some(
self.inner_builder
- .clone()
.build(self.new_output_file(partition_key)?)
.await?,
);
@@ -206,7 +205,6 @@ where
// start a new writer
self.inner = Some(
self.inner_builder
- .clone()
.build(self.new_output_file(partition_key)?)
.await?,
);
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index a7892d49e..d47523068 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -148,7 +148,7 @@
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for
LatencyRecordWriterBuilder<B> {
//! type R = LatencyRecordWriter<B::R>;
//!
-//! async fn build(self, partition_key: Option<PartitionKey>) ->
Result<Self::R> {
+//! async fn build(&self, partition_key: Option<PartitionKey>) ->
Result<Self::R> {
//! Ok(LatencyRecordWriter {
//! inner_writer:
self.inner_writer_builder.build(partition_key).await?,
//! })
@@ -398,13 +398,11 @@ type DefaultOutput = Vec<DataFile>;
/// The builder for iceberg writer.
#[async_trait::async_trait]
-pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
- Send + Clone + 'static
-{
+pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send +
Sync + 'static {
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// Build the iceberg writer with an optional partition key.
- async fn build(self, partition_key: Option<PartitionKey>) ->
Result<Self::R>;
+ async fn build(&self, partition_key: Option<PartitionKey>) ->
Result<Self::R>;
}
/// The iceberg writer used to write data to iceberg table.
diff --git a/crates/iceberg/src/writer/partitioning/clustered_writer.rs
b/crates/iceberg/src/writer/partitioning/clustered_writer.rs
index 358772396..01eb45208 100644
--- a/crates/iceberg/src/writer/partitioning/clustered_writer.rs
+++ b/crates/iceberg/src/writer/partitioning/clustered_writer.rs
@@ -118,7 +118,6 @@ where
// Create a new writer for the new partition
self.current_writer = Some(
self.inner_builder
- .clone()
.build(Some(partition_key.clone()))
.await?,
);
diff --git a/crates/iceberg/src/writer/partitioning/fanout_writer.rs
b/crates/iceberg/src/writer/partitioning/fanout_writer.rs
index 796c1a488..21a174b0d 100644
--- a/crates/iceberg/src/writer/partitioning/fanout_writer.rs
+++ b/crates/iceberg/src/writer/partitioning/fanout_writer.rs
@@ -73,7 +73,6 @@ where
if !self.partition_writers.contains_key(partition_key.data()) {
let writer = self
.inner_builder
- .clone()
.build(Some(partition_key.clone()))
.await?;
self.partition_writers
diff --git a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
index 0fb9cba3f..29825a541 100644
--- a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
+++ b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
@@ -75,7 +75,7 @@ where
pub async fn write(&mut self, input: I) -> Result<()> {
// Lazily create writer on first write
if self.writer.is_none() {
- self.writer = Some(self.inner_builder.clone().build(None).await?);
+ self.writer = Some(self.inner_builder.build(None).await?);
}
// Write directly to inner writer