This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d47be53f refactor(writer): Make writer builders non-consuming in 
build (#1889)
3d47be53f is described below

commit 3d47be53fbb2486055d83524a0804079cf973c27
Author: Leonz <[email protected]>
AuthorDate: Thu Dec 4 13:11:54 2025 +0800

    refactor(writer): Make writer builders non-consuming in build (#1889)
    
    ## Which issue does this PR close?
    
    - Closes #1753.
    - Related
    https://github.com/apache/iceberg-rust/pull/1735#discussion_r2428605135
    
    ## What changes are included in this PR?
    This change allows users to reuse builder instances without cloning when
    creating multiple writers with the same configuration.
    
    Modification non-consuming self in build function:
    - `IcebergWriterBuilder`
    - `RollingFileWriterBuilder`
    - `FileWriterBuilder`
    
    
    ## Are these changes tested?
---
 crates/iceberg/src/writer/base_writer/data_file_writer.rs    |  6 +++---
 .../iceberg/src/writer/base_writer/equality_delete_writer.rs | 12 ++++++------
 crates/iceberg/src/writer/file_writer/location_generator.rs  |  4 ++--
 crates/iceberg/src/writer/file_writer/mod.rs                 |  4 ++--
 crates/iceberg/src/writer/file_writer/parquet_writer.rs      |  4 ++--
 crates/iceberg/src/writer/file_writer/rolling_writer.rs      | 12 +++++-------
 crates/iceberg/src/writer/mod.rs                             |  8 +++-----
 crates/iceberg/src/writer/partitioning/clustered_writer.rs   |  1 -
 crates/iceberg/src/writer/partitioning/fanout_writer.rs      |  1 -
 .../iceberg/src/writer/partitioning/unpartitioned_writer.rs  |  2 +-
 10 files changed, 24 insertions(+), 30 deletions(-)

diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs 
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
index dcaa56cc9..cb7bd172e 100644
--- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -27,7 +27,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter, 
IcebergWriterBuilder};
 use crate::{Error, ErrorKind, Result};
 
 /// Builder for `DataFileWriter`.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, 
F: FileNameGenerator> {
     inner: RollingFileWriterBuilder<B, L, F>,
 }
@@ -53,9 +53,9 @@ where
 {
     type R = DataFileWriter<B, L, F>;
 
-    async fn build(self, partition_key: Option<PartitionKey>) -> 
Result<Self::R> {
+    async fn build(&self, partition_key: Option<PartitionKey>) -> 
Result<Self::R> {
         Ok(DataFileWriter {
-            inner: Some(self.inner.clone().build()),
+            inner: Some(self.inner.build()),
             partition_key,
         })
     }
diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs 
b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
index 664ea8433..cd0b19148 100644
--- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
@@ -34,7 +34,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder};
 use crate::{Error, ErrorKind, Result};
 
 /// Builder for `EqualityDeleteWriter`.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct EqualityDeleteFileWriterBuilder<
     B: FileWriterBuilder,
     L: LocationGenerator,
@@ -60,7 +60,7 @@ where
 }
 
 /// Config for `EqualityDeleteWriter`.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
 pub struct EqualityDeleteWriterConfig {
     // Field ids used to determine row equality in equality delete files.
     equality_ids: Vec<i32>,
@@ -123,11 +123,11 @@ where
 {
     type R = EqualityDeleteFileWriter<B, L, F>;
 
-    async fn build(self, partition_key: Option<PartitionKey>) -> 
Result<Self::R> {
+    async fn build(&self, partition_key: Option<PartitionKey>) -> 
Result<Self::R> {
         Ok(EqualityDeleteFileWriter {
-            inner: Some(self.inner.clone().build()),
-            projector: self.config.projector,
-            equality_ids: self.config.equality_ids,
+            inner: Some(self.inner.build()),
+            projector: self.config.projector.clone(),
+            equality_ids: self.config.equality_ids.clone(),
             partition_key,
         })
     }
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs 
b/crates/iceberg/src/writer/file_writer/location_generator.rs
index a5cfc2829..0ad4d91ac 100644
--- a/crates/iceberg/src/writer/file_writer/location_generator.rs
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -24,7 +24,7 @@ use crate::Result;
 use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
 
 /// `LocationGenerator` used to generate the location of data file.
-pub trait LocationGenerator: Clone + Send + 'static {
+pub trait LocationGenerator: Clone + Send + Sync + 'static {
     /// Generate an absolute path for the given file name that includes the 
partition path.
     ///
     /// # Arguments
@@ -94,7 +94,7 @@ impl LocationGenerator for DefaultLocationGenerator {
 }
 
 /// `FileNameGeneratorTrait` used to generate file name for data file. The 
file name can be passed to `LocationGenerator` to generate the location of the 
file.
-pub trait FileNameGenerator: Clone + Send + 'static {
+pub trait FileNameGenerator: Clone + Send + Sync + 'static {
     /// Generate a file name.
     fn generate_file_name(&self) -> String;
 }
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs 
b/crates/iceberg/src/writer/file_writer/mod.rs
index 2ed6414ce..101919f5b 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -36,11 +36,11 @@ pub mod rolling_writer;
 type DefaultOutput = Vec<DataFileBuilder>;
 
 /// File writer builder trait.
-pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
+pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
     /// The associated file writer type.
     type R: FileWriter<O>;
     /// Build file writer.
-    fn build(self, output_file: OutputFile) -> impl Future<Output = 
Result<Self::R>> + Send;
+    fn build(&self, output_file: OutputFile) -> impl Future<Output = 
Result<Self::R>> + Send;
 }
 
 /// File writer focus on writing record batch to different physical file 
format.(Such as parquet. orc)
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 3e9d1715c..5cf031a9f 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -81,11 +81,11 @@ impl ParquetWriterBuilder {
 impl FileWriterBuilder for ParquetWriterBuilder {
     type R = ParquetWriter;
 
-    async fn build(self, output_file: OutputFile) -> Result<Self::R> {
+    async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
         Ok(ParquetWriter {
             schema: self.schema.clone(),
             inner_writer: None,
-            writer_properties: self.props,
+            writer_properties: self.props.clone(),
             current_row_num: 0,
             output_file,
             nan_value_count_visitor: 
NanValueCountVisitor::new_with_match_mode(self.match_mode),
diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs 
b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
index 8f0365478..06246ab66 100644
--- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
@@ -103,15 +103,15 @@ where
     }
 
     /// Build a new [`RollingFileWriter`].
-    pub fn build(self) -> RollingFileWriter<B, L, F> {
+    pub fn build(&self) -> RollingFileWriter<B, L, F> {
         RollingFileWriter {
             inner: None,
-            inner_builder: self.inner_builder,
+            inner_builder: self.inner_builder.clone(),
             target_file_size: self.target_file_size,
             data_file_builders: vec![],
-            file_io: self.file_io,
-            location_generator: self.location_generator,
-            file_name_generator: self.file_name_generator,
+            file_io: self.file_io.clone(),
+            location_generator: self.location_generator.clone(),
+            file_name_generator: self.file_name_generator.clone(),
         }
     }
 }
@@ -192,7 +192,6 @@ where
             // initialize inner writer
             self.inner = Some(
                 self.inner_builder
-                    .clone()
                     .build(self.new_output_file(partition_key)?)
                     .await?,
             );
@@ -206,7 +205,6 @@ where
                 // start a new writer
                 self.inner = Some(
                     self.inner_builder
-                        .clone()
                         .build(self.new_output_file(partition_key)?)
                         .await?,
                 );
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index a7892d49e..d47523068 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -148,7 +148,7 @@
 //! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for 
LatencyRecordWriterBuilder<B> {
 //!     type R = LatencyRecordWriter<B::R>;
 //!
-//!     async fn build(self, partition_key: Option<PartitionKey>) -> 
Result<Self::R> {
+//!     async fn build(&self, partition_key: Option<PartitionKey>) -> 
Result<Self::R> {
 //!         Ok(LatencyRecordWriter {
 //!             inner_writer: 
self.inner_writer_builder.build(partition_key).await?,
 //!         })
@@ -398,13 +398,11 @@ type DefaultOutput = Vec<DataFile>;
 
 /// The builder for iceberg writer.
 #[async_trait::async_trait]
-pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
-    Send + Clone + 'static
-{
+pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send + 
Sync + 'static {
     /// The associated writer type.
     type R: IcebergWriter<I, O>;
     /// Build the iceberg writer with an optional partition key.
-    async fn build(self, partition_key: Option<PartitionKey>) -> 
Result<Self::R>;
+    async fn build(&self, partition_key: Option<PartitionKey>) -> 
Result<Self::R>;
 }
 
 /// The iceberg writer used to write data to iceberg table.
diff --git a/crates/iceberg/src/writer/partitioning/clustered_writer.rs 
b/crates/iceberg/src/writer/partitioning/clustered_writer.rs
index 358772396..01eb45208 100644
--- a/crates/iceberg/src/writer/partitioning/clustered_writer.rs
+++ b/crates/iceberg/src/writer/partitioning/clustered_writer.rs
@@ -118,7 +118,6 @@ where
             // Create a new writer for the new partition
             self.current_writer = Some(
                 self.inner_builder
-                    .clone()
                     .build(Some(partition_key.clone()))
                     .await?,
             );
diff --git a/crates/iceberg/src/writer/partitioning/fanout_writer.rs 
b/crates/iceberg/src/writer/partitioning/fanout_writer.rs
index 796c1a488..21a174b0d 100644
--- a/crates/iceberg/src/writer/partitioning/fanout_writer.rs
+++ b/crates/iceberg/src/writer/partitioning/fanout_writer.rs
@@ -73,7 +73,6 @@ where
         if !self.partition_writers.contains_key(partition_key.data()) {
             let writer = self
                 .inner_builder
-                .clone()
                 .build(Some(partition_key.clone()))
                 .await?;
             self.partition_writers
diff --git a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs 
b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
index 0fb9cba3f..29825a541 100644
--- a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
+++ b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
@@ -75,7 +75,7 @@ where
     pub async fn write(&mut self, input: I) -> Result<()> {
         // Lazily create writer on first write
         if self.writer.is_none() {
-            self.writer = Some(self.inner_builder.clone().build(None).await?);
+            self.writer = Some(self.inner_builder.build(None).await?);
         }
 
         // Write directly to inner writer

Reply via email to