CTTY commented on code in PR #1547:
URL: https://github.com/apache/iceberg-rust/pull/1547#discussion_r2232078220


##########
crates/iceberg/src/writer/file_writer/rolling_writer.rs:
##########
@@ -0,0 +1,331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::RecordBatch;
+use futures::future::try_join_all;
+
+use crate::runtime::{JoinHandle, spawn};
+use crate::spec::DataFileBuilder;
+use crate::writer::CurrentFileStatus;
+use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::{Error, ErrorKind, Result};
+
+/// Builder for creating a `RollingFileWriter` that rolls over to a new file
+/// when the data size exceeds a target threshold.
+#[derive(Clone)]
+pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
+    inner_builder: B,
+    target_size: usize,
+}
+
+impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
+    /// Creates a new `RollingFileWriterBuilder` with the specified inner 
builder and target size.
+    ///
+    /// # Arguments
+    ///
+    /// * `inner_builder` - The builder for the underlying file writer
+    /// * `target_size` - The target size in bytes before rolling over to a 
new file
+    pub fn new(inner_builder: B, target_size: usize) -> Self {
+        Self {
+            inner_builder,
+            target_size,
+        }
+    }
+}
+
+impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
+    type R = RollingFileWriter<B>;
+
+    async fn build(self) -> Result<Self::R> {
+        Ok(RollingFileWriter {
+            inner: None,
+            inner_builder: self.inner_builder,
+            target_size: self.target_size,
+            close_handles: vec![],
+        })
+    }
+}
+
+/// A writer that automatically rolls over to a new file when the data size
+/// exceeds a target threshold.
+///
+/// This writer wraps another file writer that tracks the amount of data 
written.
+/// When the data size exceeds the target size, it closes the current file and
+/// starts writing to a new one.
+pub struct RollingFileWriter<B: FileWriterBuilder> {
+    inner: Option<B::R>,
+    inner_builder: B,
+    target_size: usize,
+    close_handles: Vec<JoinHandle<Result<Vec<DataFileBuilder>>>>,
+}
+
+impl<B: FileWriterBuilder> RollingFileWriter<B> {
+    /// Determines if the writer should roll over to a new file.
+    ///
+    /// # Arguments
+    ///
+    /// * `input_size` - The size in bytes of the incoming data
+    ///
+    /// # Returns
+    ///
+    /// `true` if a new file should be started, `false` otherwise
+    pub fn should_roll(&self, input_size: usize) -> bool {
+        self.current_written_size() + input_size > self.target_size
+    }
+}
+
+impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
+    async fn write(&mut self, input: &RecordBatch) -> Result<()> {
+        let input_size = input.get_array_memory_size();

Review Comment:
   Hi @liurenjie1024 , after testing with the suggested changes, I found an 
interesting issue:
   
   tldr: the existing `ParquetWriter` can only get the correct 
`current_written_size` when it's closing and flushing data, not when writing 
data.
   
   This can cause the following case to fail:
   ```
   let writer: RollingWriter = ...
   
   // should create 1 file
   // but won't update current_written_size because we won't close the writer 
in write()
   writer.write(batch1).await? 
   
   // if this write should rollover, but since the inner.current_written_size 
is not updated
   // it will try to write the data to the same file as the previous batch
   writer.write(batch2).await? 
   ```
   
   A more detailed analysis:
   - `ParquetWriter` uses `ArrowAsyncWriter` as its inner writer
   - `ArrowAsyncWriter` has _async_writer_ (`ArrowRowGroupWriter`) and 
_sync_writer_ (`TrackWriter` in this case)
   - `ArrowAsyncWriter`'s _sync_writer_  will buffer rows based on the config 
value `max_row_group_size` (default is 1024 x 1024), causing `TrackWriter` 
won't be able to track the data in the buffer until closing
   
   Basically this issue can happen a lot when the `max_row_group_size` is large 
and the `target_file_size` is small
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to