This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new fc3d475  refactor: Implement ArrowAsyncFileWriter directly to remove 
tokio (#427)
fc3d475 is described below

commit fc3d4757996c20ca50fccebf25a0e6288abeb9d8
Author: Xuanwo <[email protected]>
AuthorDate: Tue Jul 2 10:33:01 2024 +0800

    refactor: Implement ArrowAsyncFileWriter directly to remove tokio (#427)
    
    * refactor: Implement ArrowAsyncFileWriter directly to remove tokio
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Make build pass
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 crates/iceberg/Cargo.toml                          |   2 +-
 .../src/writer/file_writer/parquet_writer.rs       | 129 +++++----------------
 2 files changed, 33 insertions(+), 98 deletions(-)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 5404ac9..6cad4ad 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -71,7 +71,6 @@ serde_derive = { workspace = true }
 serde_json = { workspace = true }
 serde_repr = { workspace = true }
 serde_with = { workspace = true }
-tokio = { workspace = true }
 typed-builder = { workspace = true }
 url = { workspace = true }
 urlencoding = { workspace = true }
@@ -82,3 +81,4 @@ iceberg_test_utils = { path = "../test_utils", features = 
["tests"] }
 pretty_assertions = { workspace = true }
 tempfile = { workspace = true }
 tera = { workspace = true }
+tokio = { workspace = true }
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 50a507e..43e49fc 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -17,6 +17,11 @@
 
 //! The module contains the file writer for parquet file format.
 
+use super::{
+    location_generator::{FileNameGenerator, LocationGenerator},
+    track_writer::TrackWriter,
+    FileWriter, FileWriterBuilder,
+};
 use crate::arrow::DEFAULT_MAP_FIELD_NAME;
 use crate::spec::{
     visit_schema, Datum, ListType, MapType, NestedFieldRef, PrimitiveLiteral, 
PrimitiveType,
@@ -40,25 +45,19 @@ use parquet::data_type::{
 };
 use parquet::file::properties::WriterProperties;
 use parquet::file::statistics::TypedStatistics;
-use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
+use parquet::{
+    arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter, 
arrow::AsyncArrowWriter,
+    format::FileMetaData,
+};
 use parquet::{
     data_type::{ByteArray, FixedLenByteArray},
     file::statistics::{from_thrift, Statistics},
 };
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::{
-    collections::HashMap,
-    sync::{atomic::AtomicI64, Arc},
-};
+use std::collections::HashMap;
+use std::sync::atomic::AtomicI64;
+use std::sync::Arc;
 use uuid::Uuid;
 
-use super::{
-    location_generator::{FileNameGenerator, LocationGenerator},
-    track_writer::TrackWriter,
-    FileWriter, FileWriterBuilder,
-};
-
 /// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
 #[derive(Clone)]
 pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
@@ -571,102 +570,38 @@ impl CurrentFileStatus for ParquetWriter {
 /// # NOTES
 ///
 /// We keep this wrapper been used inside only.
-///
-/// # TODO
-///
-/// Maybe we can use the buffer from ArrowWriter directly.
-struct AsyncFileWriter<W: FileWrite>(State<W>);
-
-enum State<W: FileWrite> {
-    Idle(Option<W>),
-    Write(BoxFuture<'static, (W, Result<()>)>),
-    Close(BoxFuture<'static, (W, Result<()>)>),
-}
+struct AsyncFileWriter<W: FileWrite>(W);
 
 impl<W: FileWrite> AsyncFileWriter<W> {
     /// Create a new `AsyncFileWriter` with the given writer.
     pub fn new(writer: W) -> Self {
-        Self(State::Idle(Some(writer)))
+        Self(writer)
     }
 }
 
-impl<W: FileWrite> tokio::io::AsyncWrite for AsyncFileWriter<W> {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, std::io::Error>> {
-        let this = self.get_mut();
-        loop {
-            match &mut this.0 {
-                State::Idle(w) => {
-                    let mut writer = w.take().unwrap();
-                    let bs = Bytes::copy_from_slice(buf);
-                    let fut = async move {
-                        let res = writer.write(bs).await;
-                        (writer, res)
-                    };
-                    this.0 = State::Write(Box::pin(fut));
-                }
-                State::Write(fut) => {
-                    let (writer, res) = futures::ready!(fut.as_mut().poll(cx));
-                    this.0 = State::Idle(Some(writer));
-                    return Poll::Ready(res.map(|_| buf.len()).map_err(|err| {
-                        std::io::Error::new(std::io::ErrorKind::Other, 
Box::new(err))
-                    }));
-                }
-                State::Close(_) => {
-                    return Poll::Ready(Err(std::io::Error::new(
-                        std::io::ErrorKind::Other,
-                        "file is closed",
-                    )));
-                }
-            }
-        }
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), std::io::Error>> {
-        Poll::Ready(Ok(()))
+impl<W: FileWrite> ArrowAsyncFileWriter for AsyncFileWriter<W> {
+    fn write(&mut self, bs: Bytes) -> BoxFuture<'_, 
parquet::errors::Result<()>> {
+        Box::pin(async {
+            self.0
+                .write(bs)
+                .await
+                .map_err(|err| 
parquet::errors::ParquetError::External(Box::new(err)))
+        })
     }
 
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), std::io::Error>> {
-        let this = self.get_mut();
-        loop {
-            match &mut this.0 {
-                State::Idle(w) => {
-                    let mut writer = w.take().unwrap();
-                    let fut = async move {
-                        let res = writer.close().await;
-                        (writer, res)
-                    };
-                    this.0 = State::Close(Box::pin(fut));
-                }
-                State::Write(_) => {
-                    return Poll::Ready(Err(std::io::Error::new(
-                        std::io::ErrorKind::Other,
-                        "file is writing",
-                    )));
-                }
-                State::Close(fut) => {
-                    let (writer, res) = futures::ready!(fut.as_mut().poll(cx));
-                    this.0 = State::Idle(Some(writer));
-                    return Poll::Ready(res.map_err(|err| {
-                        std::io::Error::new(std::io::ErrorKind::Other, 
Box::new(err))
-                    }));
-                }
-            }
-        }
+    fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> {
+        Box::pin(async {
+            self.0
+                .close()
+                .await
+                .map_err(|err| 
parquet::errors::ParquetError::External(Box::new(err)))
+        })
     }
 }
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
     use std::sync::Arc;
 
     use anyhow::Result;
@@ -686,8 +621,8 @@ mod tests {
 
     use super::*;
     use crate::io::FileIOBuilder;
-    use crate::spec::NestedField;
-    use crate::spec::Struct;
+    use crate::spec::*;
+    use crate::spec::{PrimitiveLiteral, Struct};
     use 
crate::writer::file_writer::location_generator::test::MockLocationGenerator;
     use 
crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
     use crate::writer::tests::check_parquet_data_file;

Reply via email to