This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new aba6209  feat: init iceberg writer (#275)
aba6209 is described below

commit aba620900e99423bbd3fed969618e67e58a03a7b
Author: ZENOTME <[email protected]>
AuthorDate: Mon Apr 22 22:31:29 2024 +0800

    feat: init iceberg writer (#275)
    
    * init iceberg writer
    
    * refine
    
    * refine the interface
    
    ---------
    
    Co-authored-by: ZENOTME <[email protected]>
---
 Cargo.toml                                         |   1 +
 crates/iceberg/Cargo.toml                          |   1 +
 .../src/writer/base_writer/data_file_writer.rs     | 318 +++++++++++++++++++++
 crates/iceberg/src/writer/{ => base_writer}/mod.rs |  19 +-
 crates/iceberg/src/writer/file_writer/mod.rs       |   6 +-
 .../src/writer/file_writer/parquet_writer.rs       | 126 ++------
 crates/iceberg/src/writer/mod.rs                   | 168 ++++++++++-
 7 files changed, 506 insertions(+), 133 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 48c242f..3c2923d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ array-init = "2"
 arrow-arith = { version = "51" }
 arrow-array = { version = "51" }
 arrow-schema = { version = "51" }
+arrow-select = { version = "51" }
 async-stream = "0.3.5"
 async-trait = "0.1"
 aws-config = "1.1.8"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 5aea856..46f167b 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -35,6 +35,7 @@ array-init = { workspace = true }
 arrow-arith = { workspace = true }
 arrow-array = { workspace = true }
 arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
 async-stream = { workspace = true }
 async-trait = { workspace = true }
 bimap = { workspace = true }
diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs 
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
new file mode 100644
index 0000000..442c9f1
--- /dev/null
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provide `DataFileWriter`.
+
+use crate::spec::{DataContentType, DataFile, Struct};
+use crate::writer::file_writer::FileWriter;
+use crate::writer::CurrentFileStatus;
+use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter, 
IcebergWriterBuilder};
+use crate::Result;
+use arrow_array::RecordBatch;
+use itertools::Itertools;
+
+/// Builder for `DataFileWriter`.
+#[derive(Clone)]
+pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
+    inner: B,
+}
+
+impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
+    /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
+    pub fn new(inner: B) -> Self {
+        Self { inner }
+    }
+}
+
+/// Config for `DataFileWriter`.
+pub struct DataFileWriterConfig {
+    partition_value: Struct,
+}
+
+impl DataFileWriterConfig {
+    /// Create a new `DataFileWriterConfig` with partition value.
+    pub fn new(partition_value: Option<Struct>) -> Self {
+        Self {
+            partition_value: partition_value.unwrap_or(Struct::empty()),
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
+    type R = DataFileWriter<B>;
+    type C = DataFileWriterConfig;
+
+    async fn build(self, config: Self::C) -> Result<Self::R> {
+        Ok(DataFileWriter {
+            inner_writer: Some(self.inner.clone().build().await?),
+            partition_value: config.partition_value,
+        })
+    }
+}
+
+/// A writer write data is within one spec/partition.
+pub struct DataFileWriter<B: FileWriterBuilder> {
+    inner_writer: Option<B::R>,
+    partition_value: Struct,
+}
+
+#[async_trait::async_trait]
+impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+    async fn write(&mut self, batch: RecordBatch) -> Result<()> {
+        self.inner_writer.as_mut().unwrap().write(&batch).await
+    }
+
+    async fn close(&mut self) -> Result<Vec<DataFile>> {
+        let writer = self.inner_writer.take().unwrap();
+        Ok(writer
+            .close()
+            .await?
+            .into_iter()
+            .map(|mut res| {
+                res.content(DataContentType::Data);
+                res.partition(self.partition_value.clone());
+                res.build().expect("Guaranteed to be valid")
+            })
+            .collect_vec())
+    }
+}
+
+impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
+    fn current_file_path(&self) -> String {
+        self.inner_writer.as_ref().unwrap().current_file_path()
+    }
+
+    fn current_row_num(&self) -> usize {
+        self.inner_writer.as_ref().unwrap().current_row_num()
+    }
+
+    fn current_written_size(&self) -> usize {
+        self.inner_writer.as_ref().unwrap().current_written_size()
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::{collections::HashMap, sync::Arc};
+
+    use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch, 
StructArray};
+    use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, 
file::properties::WriterProperties};
+    use tempfile::TempDir;
+
+    use crate::{
+        io::FileIOBuilder,
+        spec::DataFileFormat,
+        writer::{
+            base_writer::data_file_writer::{DataFileWriterBuilder, 
DataFileWriterConfig},
+            file_writer::{
+                location_generator::{test::MockLocationGenerator, 
DefaultFileNameGenerator},
+                ParquetWriterBuilder,
+            },
+            tests::check_parquet_data_file,
+            IcebergWriter, IcebergWriterBuilder,
+        },
+    };
+
+    #[tokio::test]
+    async fn test_data_file_writer() -> Result<(), anyhow::Error> {
+        let temp_dir = TempDir::new().unwrap();
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+        let location_gen =
+            
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+        let file_name_gen =
+            DefaultFileNameGenerator::new("test".to_string(), None, 
DataFileFormat::Parquet);
+
+        // prepare data
+        // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
+        let schema = {
+            let fields = vec![
+                arrow_schema::Field::new("col0", 
arrow_schema::DataType::Int64, true)
+                    .with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "0".to_string(),
+                    )])),
+                arrow_schema::Field::new(
+                    "col1",
+                    arrow_schema::DataType::Struct(
+                        vec![arrow_schema::Field::new(
+                            "sub_col",
+                            arrow_schema::DataType::Int64,
+                            true,
+                        )
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "5".to_string(),
+                        )]))]
+                        .into(),
+                    ),
+                    true,
+                )
+                .with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "1".to_string(),
+                )])),
+                arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, 
true).with_metadata(
+                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"2".to_string())]),
+                ),
+                arrow_schema::Field::new(
+                    "col3",
+                    arrow_schema::DataType::List(Arc::new(
+                        arrow_schema::Field::new("item", 
arrow_schema::DataType::Int64, true)
+                            .with_metadata(HashMap::from([(
+                                PARQUET_FIELD_ID_META_KEY.to_string(),
+                                "6".to_string(),
+                            )])),
+                    )),
+                    true,
+                )
+                .with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "3".to_string(),
+                )])),
+                arrow_schema::Field::new(
+                    "col4",
+                    arrow_schema::DataType::Struct(
+                        vec![arrow_schema::Field::new(
+                            "sub_col",
+                            arrow_schema::DataType::Struct(
+                                vec![arrow_schema::Field::new(
+                                    "sub_sub_col",
+                                    arrow_schema::DataType::Int64,
+                                    true,
+                                )
+                                .with_metadata(HashMap::from([(
+                                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                                    "7".to_string(),
+                                )]))]
+                                .into(),
+                            ),
+                            true,
+                        )
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "8".to_string(),
+                        )]))]
+                        .into(),
+                    ),
+                    true,
+                )
+                .with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "4".to_string(),
+                )])),
+            ];
+            Arc::new(arrow_schema::Schema::new(fields))
+        };
+        let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
+        let col1 = Arc::new(StructArray::new(
+            vec![
+                arrow_schema::Field::new("sub_col", 
arrow_schema::DataType::Int64, true)
+                    .with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "5".to_string(),
+                    )])),
+            ]
+            .into(),
+            vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+            None,
+        ));
+        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
+            "test";
+            1024
+        ])) as ArrayRef;
+        let col3 = Arc::new({
+            let list_parts = 
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+                Some(
+                    vec![Some(1),]
+                );
+                1024
+            ])
+            .into_parts();
+            arrow_array::ListArray::new(
+                
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "6".to_string(),
+                )]))),
+                list_parts.1,
+                list_parts.2,
+                list_parts.3,
+            )
+        }) as ArrayRef;
+        let col4 = Arc::new(StructArray::new(
+            vec![arrow_schema::Field::new(
+                "sub_col",
+                arrow_schema::DataType::Struct(
+                    vec![arrow_schema::Field::new(
+                        "sub_sub_col",
+                        arrow_schema::DataType::Int64,
+                        true,
+                    )
+                    .with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "7".to_string(),
+                    )]))]
+                    .into(),
+                ),
+                true,
+            )
+            .with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "8".to_string(),
+            )]))]
+            .into(),
+            vec![Arc::new(StructArray::new(
+                vec![
+                    arrow_schema::Field::new("sub_sub_col", 
arrow_schema::DataType::Int64, true)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "7".to_string(),
+                        )])),
+                ]
+                .into(),
+                vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+                None,
+            ))],
+            None,
+        ));
+        let to_write =
+            RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, 
col4]).unwrap();
+
+        // prepare writer
+        let pb = ParquetWriterBuilder::new(
+            WriterProperties::builder().build(),
+            to_write.schema(),
+            file_io.clone(),
+            location_gen,
+            file_name_gen,
+        );
+        let mut data_file_writer = DataFileWriterBuilder::new(pb)
+            .build(DataFileWriterConfig::new(None))
+            .await?;
+
+        // write
+        data_file_writer.write(to_write.clone()).await?;
+        let res = data_file_writer.close().await?;
+        assert_eq!(res.len(), 1);
+        let data_file = res.into_iter().next().unwrap();
+
+        // check
+        check_parquet_data_file(&file_io, &data_file, &to_write).await;
+
+        Ok(())
+    }
+}
diff --git a/crates/iceberg/src/writer/mod.rs 
b/crates/iceberg/src/writer/base_writer/mod.rs
similarity index 60%
copy from crates/iceberg/src/writer/mod.rs
copy to crates/iceberg/src/writer/base_writer/mod.rs
index ac79d7b..37da2ab 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/base_writer/mod.rs
@@ -15,21 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The iceberg writer module.
+//! Base writer module contains the basic writer provide by iceberg: 
`DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
 
-use crate::spec::DataFileBuilder;
-
-pub mod file_writer;
-
-type DefaultOutput = Vec<DataFileBuilder>;
-
-/// The current file status of iceberg writer. It implement for the writer 
which write a single
-/// file.
-pub trait CurrentFileStatus {
-    /// Get the current file path.
-    fn current_file_path(&self) -> String;
-    /// Get the current file row number.
-    fn current_row_num(&self) -> usize;
-    /// Get the current file written size.
-    fn current_written_size(&self) -> usize;
-}
+pub mod data_file_writer;
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs 
b/crates/iceberg/src/writer/file_writer/mod.rs
index f2848f4..0340df6 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -17,8 +17,8 @@
 
 //! This module contains the writer for data file format supported by iceberg: 
parquet, orc.
 
-use super::{CurrentFileStatus, DefaultOutput};
-use crate::Result;
+use super::CurrentFileStatus;
+use crate::{spec::DataFileBuilder, Result};
 use arrow_array::RecordBatch;
 use futures::Future;
 
@@ -28,6 +28,8 @@ mod track_writer;
 
 pub mod location_generator;
 
+type DefaultOutput = Vec<DataFileBuilder>;
+
 /// File writer builder trait.
 pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
     /// The associated file writer type.
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 3ec1a1b..b743d84 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -256,9 +256,7 @@ mod tests {
     use arrow_array::Int64Array;
     use arrow_array::RecordBatch;
     use arrow_array::StructArray;
-    use bytes::Bytes;
-    use futures::AsyncReadExt;
-    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    use arrow_select::concat::concat_batches;
     use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
     use tempfile::TempDir;
 
@@ -267,6 +265,7 @@ mod tests {
     use crate::spec::Struct;
     use 
crate::writer::file_writer::location_generator::test::MockLocationGenerator;
     use 
crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
+    use crate::writer::tests::check_parquet_data_file;
 
     #[derive(Clone)]
     struct TestLocationGen;
@@ -318,53 +317,9 @@ mod tests {
             .build()
             .unwrap();
 
-        // read the written file
-        let mut input_file = file_io
-            .new_input(data_file.file_path.clone())
-            .unwrap()
-            .reader()
-            .await
-            .unwrap();
-        let mut res = vec![];
-        let file_size = input_file.read_to_end(&mut res).await.unwrap();
-        let reader_builder = 
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
-        let metadata = reader_builder.metadata().clone();
-
-        // check data
-        let mut reader = reader_builder.build().unwrap();
-        let res = reader.next().unwrap().unwrap();
-        assert_eq!(to_write, res);
-        let res = reader.next().unwrap().unwrap();
-        assert_eq!(to_write_null, res);
-
-        // check metadata
-        assert_eq!(metadata.num_row_groups(), 1);
-        assert_eq!(metadata.row_group(0).num_columns(), 1);
-        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
-        assert_eq!(
-            data_file.record_count,
-            metadata
-                .row_groups()
-                .iter()
-                .map(|group| group.num_rows())
-                .sum::<i64>() as u64
-        );
-        assert_eq!(data_file.file_size_in_bytes, file_size as u64);
-        assert_eq!(data_file.column_sizes.len(), 1);
-        assert_eq!(
-            *data_file.column_sizes.get(&0).unwrap(),
-            metadata.row_group(0).column(0).compressed_size() as u64
-        );
-        assert_eq!(data_file.value_counts.len(), 1);
-        assert_eq!(*data_file.value_counts.get(&0).unwrap(), 2048);
-        assert_eq!(data_file.null_value_counts.len(), 1);
-        assert_eq!(*data_file.null_value_counts.get(&0).unwrap(), 1024);
-        assert_eq!(data_file.key_metadata.len(), 0);
-        assert_eq!(data_file.split_offsets.len(), 1);
-        assert_eq!(
-            *data_file.split_offsets.first().unwrap(),
-            metadata.row_group(0).file_offset().unwrap()
-        );
+        // check the written file
+        let expect_batch = concat_batches(&schema, vec![&to_write, 
&to_write_null]).unwrap();
+        check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
 
         Ok(())
     }
@@ -397,7 +352,7 @@ mod tests {
                         )
                         .with_metadata(HashMap::from([(
                             PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "-1".to_string(),
+                            "5".to_string(),
                         )]))]
                         .into(),
                     ),
@@ -416,7 +371,7 @@ mod tests {
                         arrow_schema::Field::new("item", 
arrow_schema::DataType::Int64, true)
                             .with_metadata(HashMap::from([(
                                 PARQUET_FIELD_ID_META_KEY.to_string(),
-                                "-1".to_string(),
+                                "6".to_string(),
                             )])),
                     )),
                     true,
@@ -438,7 +393,7 @@ mod tests {
                                 )
                                 .with_metadata(HashMap::from([(
                                     PARQUET_FIELD_ID_META_KEY.to_string(),
-                                    "-1".to_string(),
+                                    "7".to_string(),
                                 )]))]
                                 .into(),
                             ),
@@ -446,7 +401,7 @@ mod tests {
                         )
                         .with_metadata(HashMap::from([(
                             PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "-1".to_string(),
+                            "8".to_string(),
                         )]))]
                         .into(),
                     ),
@@ -465,7 +420,7 @@ mod tests {
                 arrow_schema::Field::new("sub_col", 
arrow_schema::DataType::Int64, true)
                     .with_metadata(HashMap::from([(
                         PARQUET_FIELD_ID_META_KEY.to_string(),
-                        "-1".to_string(),
+                        "5".to_string(),
                     )])),
             ]
             .into(),
@@ -487,7 +442,7 @@ mod tests {
             arrow_array::ListArray::new(
                 
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
                     PARQUET_FIELD_ID_META_KEY.to_string(),
-                    "-1".to_string(),
+                    "6".to_string(),
                 )]))),
                 list_parts.1,
                 list_parts.2,
@@ -505,7 +460,7 @@ mod tests {
                     )
                     .with_metadata(HashMap::from([(
                         PARQUET_FIELD_ID_META_KEY.to_string(),
-                        "-1".to_string(),
+                        "7".to_string(),
                     )]))]
                     .into(),
                 ),
@@ -513,7 +468,7 @@ mod tests {
             )
             .with_metadata(HashMap::from([(
                 PARQUET_FIELD_ID_META_KEY.to_string(),
-                "-1".to_string(),
+                "8".to_string(),
             )]))]
             .into(),
             vec![Arc::new(StructArray::new(
@@ -521,7 +476,7 @@ mod tests {
                     arrow_schema::Field::new("sub_sub_col", 
arrow_schema::DataType::Int64, true)
                         .with_metadata(HashMap::from([(
                             PARQUET_FIELD_ID_META_KEY.to_string(),
-                            "-1".to_string(),
+                            "7".to_string(),
                         )])),
                 ]
                 .into(),
@@ -556,57 +511,8 @@ mod tests {
             .build()
             .unwrap();
 
-        // read the written file
-        let mut input_file = file_io
-            .new_input(data_file.file_path.clone())
-            .unwrap()
-            .reader()
-            .await
-            .unwrap();
-        let mut res = vec![];
-        let file_size = input_file.read_to_end(&mut res).await.unwrap();
-        let reader_builder = 
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
-        let metadata = reader_builder.metadata().clone();
-
-        // check data
-        let mut reader = reader_builder.build().unwrap();
-        let res = reader.next().unwrap().unwrap();
-        assert_eq!(to_write, res);
-
-        // check metadata
-        assert_eq!(metadata.num_row_groups(), 1);
-        assert_eq!(metadata.row_group(0).num_columns(), 5);
-        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
-        assert_eq!(
-            data_file.record_count,
-            metadata
-                .row_groups()
-                .iter()
-                .map(|group| group.num_rows())
-                .sum::<i64>() as u64
-        );
-        assert_eq!(data_file.file_size_in_bytes, file_size as u64);
-        assert_eq!(data_file.column_sizes.len(), 5);
-        assert_eq!(
-            *data_file.column_sizes.get(&0).unwrap(),
-            metadata.row_group(0).column(0).compressed_size() as u64
-        );
-        assert_eq!(data_file.value_counts.len(), 5);
-        data_file
-            .value_counts
-            .iter()
-            .for_each(|(_, v)| assert_eq!(*v, 1024));
-        assert_eq!(data_file.null_value_counts.len(), 5);
-        data_file
-            .null_value_counts
-            .iter()
-            .for_each(|(_, v)| assert_eq!(*v, 0));
-        assert_eq!(data_file.key_metadata.len(), 0);
-        assert_eq!(data_file.split_offsets.len(), 1);
-        assert_eq!(
-            *data_file.split_offsets.first().unwrap(),
-            metadata.row_group(0).file_offset().unwrap()
-        );
+        // check the written file
+        check_parquet_data_file(&file_io, &data_file, &to_write).await;
 
         Ok(())
     }
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index ac79d7b..7618d2e 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -15,13 +15,69 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The iceberg writer module.
-
-use crate::spec::DataFileBuilder;
+//! Iceberg writer module.
+//!
+//! The writer API is designed to be extensible and flexible. Each writer is 
decoupled and can be create and config independently. User can:
+//! 1.Customize the writer using the writer trait.
+//! 2.Combine different writer to build a writer which have complex write 
logic.
+//!
+//! There are two kinds of writer:
+//! 1. FileWriter: Focus on writing record batch to different physical file 
format.(Such as parquet. orc)
+//! 2. IcebergWriter: Focus on the logical format of iceberg table. It will 
write the data using the FileWriter finally.
+//!
+//! # Simple example for data file writer:
+//! ```ignore
+//! // Create a parquet file writer builder. The parameter can get from table.
+//! let file_writer_builder = ParquetWriterBuilder::new(
+//!    0,
+//!    WriterProperties::builder().build(),
+//!    schema,
+//!    file_io.clone(),
+//!    loccation_gen,
+//!    file_name_gen,
+//! )
+//! // Create a data file writer using parquet file writer builder.
+//! let data_file_builder = DataFileBuilder::new(file_writer_builder);
+//! // Build the data file writer.
+//! let data_file_writer = data_file_builder.build().await.unwrap();
+//!
+//! data_file_writer.write(&record_batch).await.unwrap();
+//! let data_files = data_file_writer.flush().await.unwrap();
+//! ```
 
+pub mod base_writer;
 pub mod file_writer;
 
-type DefaultOutput = Vec<DataFileBuilder>;
+use crate::{spec::DataFile, Result};
+use arrow_array::RecordBatch;
+
+type DefaultInput = RecordBatch;
+type DefaultOutput = Vec<DataFile>;
+
+/// The builder for iceberg writer.
+#[async_trait::async_trait]
+pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
+    Send + Clone + 'static
+{
+    /// The associated writer type.
+    type R: IcebergWriter<I, O>;
+    /// The associated writer config type used to build the writer.
+    type C;
+    /// Build the iceberg writer.
+    async fn build(self, config: Self::C) -> Result<Self::R>;
+}
+
+/// The iceberg writer used to write data to iceberg table.
+#[async_trait::async_trait]
+pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
+    /// Write data to iceberg table.
+    async fn write(&mut self, input: I) -> Result<()>;
+    /// Close the writer and return the written data files.
+    /// If close failed, the data written before maybe be lost. User may need 
to recreate the writer and rewrite the data again.
+    /// # NOTE
+    /// After close, no matter successfully or fail,the writer should never be 
used again, otherwise the writer will panic.
+    async fn close(&mut self) -> Result<O>;
+}
 
 /// The current file status of iceberg writer. It implement for the writer 
which write a single
 /// file.
@@ -33,3 +89,107 @@ pub trait CurrentFileStatus {
     /// Get the current file written size.
     fn current_written_size(&self) -> usize;
 }
+
+#[cfg(test)]
+mod tests {
+    use arrow_array::RecordBatch;
+    use arrow_schema::Schema;
+    use arrow_select::concat::concat_batches;
+    use bytes::Bytes;
+    use futures::AsyncReadExt;
+    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+
+    use crate::{
+        io::FileIO,
+        spec::{DataFile, DataFileFormat},
+    };
+
+    use super::IcebergWriter;
+
+    // This function is used to guarantee the trait can be used as a object 
safe trait.
+    async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
+        let _ = w
+            .write(RecordBatch::new_empty(Schema::empty().into()))
+            .await;
+        let _ = w.close().await;
+    }
+
+    // This function check:
+    // The data of the written parquet file is correct.
+    // The metadata of the data file is consistent with the written parquet 
file.
+    pub(crate) async fn check_parquet_data_file(
+        file_io: &FileIO,
+        data_file: &DataFile,
+        batch: &RecordBatch,
+    ) {
+        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+
+        // read the written file
+        let mut input_file = file_io
+            .new_input(data_file.file_path.clone())
+            .unwrap()
+            .reader()
+            .await
+            .unwrap();
+        let mut res = vec![];
+        let file_size = input_file.read_to_end(&mut res).await.unwrap();
+        let reader_builder = 
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+        let metadata = reader_builder.metadata().clone();
+
+        // check data
+        let reader = reader_builder.build().unwrap();
+        let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
+        let res = concat_batches(&batch.schema(), &batches).unwrap();
+        assert_eq!(*batch, res);
+
+        // check metadata
+        let expect_column_num = batch.num_columns();
+
+        assert_eq!(
+            data_file.record_count,
+            metadata
+                .row_groups()
+                .iter()
+                .map(|group| group.num_rows())
+                .sum::<i64>() as u64
+        );
+
+        assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+
+        assert_eq!(data_file.column_sizes.len(), expect_column_num);
+        data_file.column_sizes.iter().for_each(|(&k, &v)| {
+            let expect = metadata
+                .row_groups()
+                .iter()
+                .map(|group| group.column(k as usize).compressed_size())
+                .sum::<i64>() as u64;
+            assert_eq!(v, expect);
+        });
+
+        assert_eq!(data_file.value_counts.len(), expect_column_num);
+        data_file.value_counts.iter().for_each(|(_, &v)| {
+            let expect = metadata
+                .row_groups()
+                .iter()
+                .map(|group| group.num_rows())
+                .sum::<i64>() as u64;
+            assert_eq!(v, expect);
+        });
+
+        assert_eq!(data_file.null_value_counts.len(), expect_column_num);
+        data_file.null_value_counts.iter().for_each(|(&k, &v)| {
+            let expect = batch.column(k as usize).null_count() as u64;
+            assert_eq!(v, expect);
+        });
+
+        assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
+        data_file
+            .split_offsets
+            .iter()
+            .enumerate()
+            .for_each(|(i, &v)| {
+                let expect = metadata.row_groups()[i].file_offset().unwrap();
+                assert_eq!(v, expect);
+            });
+    }
+}

Reply via email to