This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new aba6209 feat: init iceberg writer (#275)
aba6209 is described below
commit aba620900e99423bbd3fed969618e67e58a03a7b
Author: ZENOTME <[email protected]>
AuthorDate: Mon Apr 22 22:31:29 2024 +0800
feat: init iceberg writer (#275)
* init iceberg writer
* refine
* refine the interface
---------
Co-authored-by: ZENOTME <[email protected]>
---
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 1 +
.../src/writer/base_writer/data_file_writer.rs | 318 +++++++++++++++++++++
crates/iceberg/src/writer/{ => base_writer}/mod.rs | 19 +-
crates/iceberg/src/writer/file_writer/mod.rs | 6 +-
.../src/writer/file_writer/parquet_writer.rs | 126 ++------
crates/iceberg/src/writer/mod.rs | 168 ++++++++++-
7 files changed, 506 insertions(+), 133 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 48c242f..3c2923d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ array-init = "2"
arrow-arith = { version = "51" }
arrow-array = { version = "51" }
arrow-schema = { version = "51" }
+arrow-select = { version = "51" }
async-stream = "0.3.5"
async-trait = "0.1"
aws-config = "1.1.8"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 5aea856..46f167b 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -35,6 +35,7 @@ array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
new file mode 100644
index 0000000..442c9f1
--- /dev/null
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provide `DataFileWriter`.
+
+use crate::spec::{DataContentType, DataFile, Struct};
+use crate::writer::file_writer::FileWriter;
+use crate::writer::CurrentFileStatus;
+use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter,
IcebergWriterBuilder};
+use crate::Result;
+use arrow_array::RecordBatch;
+use itertools::Itertools;
+
+/// Builder for `DataFileWriter`.
+#[derive(Clone)]
+pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
+ inner: B,
+}
+
+impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
+ /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
+ pub fn new(inner: B) -> Self {
+ Self { inner }
+ }
+}
+
+/// Config for `DataFileWriter`.
+pub struct DataFileWriterConfig {
+ partition_value: Struct,
+}
+
+impl DataFileWriterConfig {
+ /// Create a new `DataFileWriterConfig` with partition value.
+ pub fn new(partition_value: Option<Struct>) -> Self {
+ Self {
+ partition_value: partition_value.unwrap_or(Struct::empty()),
+ }
+ }
+}
+
+#[async_trait::async_trait]
+impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
+ type R = DataFileWriter<B>;
+ type C = DataFileWriterConfig;
+
+ async fn build(self, config: Self::C) -> Result<Self::R> {
+ Ok(DataFileWriter {
+ inner_writer: Some(self.inner.clone().build().await?),
+ partition_value: config.partition_value,
+ })
+ }
+}
+
+/// A writer write data is within one spec/partition.
+pub struct DataFileWriter<B: FileWriterBuilder> {
+ inner_writer: Option<B::R>,
+ partition_value: Struct,
+}
+
+#[async_trait::async_trait]
+impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+ async fn write(&mut self, batch: RecordBatch) -> Result<()> {
+ self.inner_writer.as_mut().unwrap().write(&batch).await
+ }
+
+ async fn close(&mut self) -> Result<Vec<DataFile>> {
+ let writer = self.inner_writer.take().unwrap();
+ Ok(writer
+ .close()
+ .await?
+ .into_iter()
+ .map(|mut res| {
+ res.content(DataContentType::Data);
+ res.partition(self.partition_value.clone());
+ res.build().expect("Guaranteed to be valid")
+ })
+ .collect_vec())
+ }
+}
+
+impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
+ fn current_file_path(&self) -> String {
+ self.inner_writer.as_ref().unwrap().current_file_path()
+ }
+
+ fn current_row_num(&self) -> usize {
+ self.inner_writer.as_ref().unwrap().current_row_num()
+ }
+
+ fn current_written_size(&self) -> usize {
+ self.inner_writer.as_ref().unwrap().current_written_size()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::{collections::HashMap, sync::Arc};
+
+ use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch,
StructArray};
+ use parquet::{arrow::PARQUET_FIELD_ID_META_KEY,
file::properties::WriterProperties};
+ use tempfile::TempDir;
+
+ use crate::{
+ io::FileIOBuilder,
+ spec::DataFileFormat,
+ writer::{
+ base_writer::data_file_writer::{DataFileWriterBuilder,
DataFileWriterConfig},
+ file_writer::{
+ location_generator::{test::MockLocationGenerator,
DefaultFileNameGenerator},
+ ParquetWriterBuilder,
+ },
+ tests::check_parquet_data_file,
+ IcebergWriter, IcebergWriterBuilder,
+ },
+ };
+
+ #[tokio::test]
+ async fn test_data_file_writer() -> Result<(), anyhow::Error> {
+ let temp_dir = TempDir::new().unwrap();
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let location_gen =
+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+ let file_name_gen =
+ DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
+
+ // prepare data
+ // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
+ let schema = {
+ let fields = vec![
+ arrow_schema::Field::new("col0",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "0".to_string(),
+ )])),
+ arrow_schema::Field::new(
+ "col1",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8,
true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string())]),
+ ),
+ arrow_schema::Field::new(
+ "col3",
+ arrow_schema::DataType::List(Arc::new(
+ arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )])),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ arrow_schema::Field::new(
+ "col4",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "4".to_string(),
+ )])),
+ ];
+ Arc::new(arrow_schema::Schema::new(fields))
+ };
+ let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let col1 = Arc::new(StructArray::new(
+ vec![
+ arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )])),
+ ]
+ .into(),
+ vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ None,
+ ));
+ let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
+ "test";
+ 1024
+ ])) as ArrayRef;
+ let col3 = Arc::new({
+ let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+ Some(
+ vec![Some(1),]
+ );
+ 1024
+ ])
+ .into_parts();
+ arrow_array::ListArray::new(
+
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )]))),
+ list_parts.1,
+ list_parts.2,
+ list_parts.3,
+ )
+ }) as ArrayRef;
+ let col4 = Arc::new(StructArray::new(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )]))]
+ .into(),
+ vec![Arc::new(StructArray::new(
+ vec![
+ arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )])),
+ ]
+ .into(),
+ vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ None,
+ ))],
+ None,
+ ));
+ let to_write =
+ RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3,
col4]).unwrap();
+
+ // prepare writer
+ let pb = ParquetWriterBuilder::new(
+ WriterProperties::builder().build(),
+ to_write.schema(),
+ file_io.clone(),
+ location_gen,
+ file_name_gen,
+ );
+ let mut data_file_writer = DataFileWriterBuilder::new(pb)
+ .build(DataFileWriterConfig::new(None))
+ .await?;
+
+ // write
+ data_file_writer.write(to_write.clone()).await?;
+ let res = data_file_writer.close().await?;
+ assert_eq!(res.len(), 1);
+ let data_file = res.into_iter().next().unwrap();
+
+ // check
+ check_parquet_data_file(&file_io, &data_file, &to_write).await;
+
+ Ok(())
+ }
+}
diff --git a/crates/iceberg/src/writer/mod.rs
b/crates/iceberg/src/writer/base_writer/mod.rs
similarity index 60%
copy from crates/iceberg/src/writer/mod.rs
copy to crates/iceberg/src/writer/base_writer/mod.rs
index ac79d7b..37da2ab 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/base_writer/mod.rs
@@ -15,21 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-//! The iceberg writer module.
+//! Base writer module contains the basic writer provide by iceberg:
`DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
-use crate::spec::DataFileBuilder;
-
-pub mod file_writer;
-
-type DefaultOutput = Vec<DataFileBuilder>;
-
-/// The current file status of iceberg writer. It implement for the writer
which write a single
-/// file.
-pub trait CurrentFileStatus {
- /// Get the current file path.
- fn current_file_path(&self) -> String;
- /// Get the current file row number.
- fn current_row_num(&self) -> usize;
- /// Get the current file written size.
- fn current_written_size(&self) -> usize;
-}
+pub mod data_file_writer;
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs
b/crates/iceberg/src/writer/file_writer/mod.rs
index f2848f4..0340df6 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -17,8 +17,8 @@
//! This module contains the writer for data file format supported by iceberg:
parquet, orc.
-use super::{CurrentFileStatus, DefaultOutput};
-use crate::Result;
+use super::CurrentFileStatus;
+use crate::{spec::DataFileBuilder, Result};
use arrow_array::RecordBatch;
use futures::Future;
@@ -28,6 +28,8 @@ mod track_writer;
pub mod location_generator;
+type DefaultOutput = Vec<DataFileBuilder>;
+
/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
/// The associated file writer type.
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 3ec1a1b..b743d84 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -256,9 +256,7 @@ mod tests {
use arrow_array::Int64Array;
use arrow_array::RecordBatch;
use arrow_array::StructArray;
- use bytes::Bytes;
- use futures::AsyncReadExt;
- use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+ use arrow_select::concat::concat_batches;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use tempfile::TempDir;
@@ -267,6 +265,7 @@ mod tests {
use crate::spec::Struct;
use
crate::writer::file_writer::location_generator::test::MockLocationGenerator;
use
crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
+ use crate::writer::tests::check_parquet_data_file;
#[derive(Clone)]
struct TestLocationGen;
@@ -318,53 +317,9 @@ mod tests {
.build()
.unwrap();
- // read the written file
- let mut input_file = file_io
- .new_input(data_file.file_path.clone())
- .unwrap()
- .reader()
- .await
- .unwrap();
- let mut res = vec![];
- let file_size = input_file.read_to_end(&mut res).await.unwrap();
- let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
- let metadata = reader_builder.metadata().clone();
-
- // check data
- let mut reader = reader_builder.build().unwrap();
- let res = reader.next().unwrap().unwrap();
- assert_eq!(to_write, res);
- let res = reader.next().unwrap().unwrap();
- assert_eq!(to_write_null, res);
-
- // check metadata
- assert_eq!(metadata.num_row_groups(), 1);
- assert_eq!(metadata.row_group(0).num_columns(), 1);
- assert_eq!(data_file.file_format, DataFileFormat::Parquet);
- assert_eq!(
- data_file.record_count,
- metadata
- .row_groups()
- .iter()
- .map(|group| group.num_rows())
- .sum::<i64>() as u64
- );
- assert_eq!(data_file.file_size_in_bytes, file_size as u64);
- assert_eq!(data_file.column_sizes.len(), 1);
- assert_eq!(
- *data_file.column_sizes.get(&0).unwrap(),
- metadata.row_group(0).column(0).compressed_size() as u64
- );
- assert_eq!(data_file.value_counts.len(), 1);
- assert_eq!(*data_file.value_counts.get(&0).unwrap(), 2048);
- assert_eq!(data_file.null_value_counts.len(), 1);
- assert_eq!(*data_file.null_value_counts.get(&0).unwrap(), 1024);
- assert_eq!(data_file.key_metadata.len(), 0);
- assert_eq!(data_file.split_offsets.len(), 1);
- assert_eq!(
- *data_file.split_offsets.first().unwrap(),
- metadata.row_group(0).file_offset().unwrap()
- );
+ // check the written file
+ let expect_batch = concat_batches(&schema, vec![&to_write,
&to_write_null]).unwrap();
+ check_parquet_data_file(&file_io, &data_file, &expect_batch).await;
Ok(())
}
@@ -397,7 +352,7 @@ mod tests {
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "5".to_string(),
)]))]
.into(),
),
@@ -416,7 +371,7 @@ mod tests {
arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "6".to_string(),
)])),
)),
true,
@@ -438,7 +393,7 @@ mod tests {
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "7".to_string(),
)]))]
.into(),
),
@@ -446,7 +401,7 @@ mod tests {
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "8".to_string(),
)]))]
.into(),
),
@@ -465,7 +420,7 @@ mod tests {
arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "5".to_string(),
)])),
]
.into(),
@@ -487,7 +442,7 @@ mod tests {
arrow_array::ListArray::new(
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "6".to_string(),
)]))),
list_parts.1,
list_parts.2,
@@ -505,7 +460,7 @@ mod tests {
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "7".to_string(),
)]))]
.into(),
),
@@ -513,7 +468,7 @@ mod tests {
)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "8".to_string(),
)]))]
.into(),
vec![Arc::new(StructArray::new(
@@ -521,7 +476,7 @@ mod tests {
arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
- "-1".to_string(),
+ "7".to_string(),
)])),
]
.into(),
@@ -556,57 +511,8 @@ mod tests {
.build()
.unwrap();
- // read the written file
- let mut input_file = file_io
- .new_input(data_file.file_path.clone())
- .unwrap()
- .reader()
- .await
- .unwrap();
- let mut res = vec![];
- let file_size = input_file.read_to_end(&mut res).await.unwrap();
- let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
- let metadata = reader_builder.metadata().clone();
-
- // check data
- let mut reader = reader_builder.build().unwrap();
- let res = reader.next().unwrap().unwrap();
- assert_eq!(to_write, res);
-
- // check metadata
- assert_eq!(metadata.num_row_groups(), 1);
- assert_eq!(metadata.row_group(0).num_columns(), 5);
- assert_eq!(data_file.file_format, DataFileFormat::Parquet);
- assert_eq!(
- data_file.record_count,
- metadata
- .row_groups()
- .iter()
- .map(|group| group.num_rows())
- .sum::<i64>() as u64
- );
- assert_eq!(data_file.file_size_in_bytes, file_size as u64);
- assert_eq!(data_file.column_sizes.len(), 5);
- assert_eq!(
- *data_file.column_sizes.get(&0).unwrap(),
- metadata.row_group(0).column(0).compressed_size() as u64
- );
- assert_eq!(data_file.value_counts.len(), 5);
- data_file
- .value_counts
- .iter()
- .for_each(|(_, v)| assert_eq!(*v, 1024));
- assert_eq!(data_file.null_value_counts.len(), 5);
- data_file
- .null_value_counts
- .iter()
- .for_each(|(_, v)| assert_eq!(*v, 0));
- assert_eq!(data_file.key_metadata.len(), 0);
- assert_eq!(data_file.split_offsets.len(), 1);
- assert_eq!(
- *data_file.split_offsets.first().unwrap(),
- metadata.row_group(0).file_offset().unwrap()
- );
+ // check the written file
+ check_parquet_data_file(&file_io, &data_file, &to_write).await;
Ok(())
}
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index ac79d7b..7618d2e 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -15,13 +15,69 @@
// specific language governing permissions and limitations
// under the License.
-//! The iceberg writer module.
-
-use crate::spec::DataFileBuilder;
+//! Iceberg writer module.
+//!
+//! The writer API is designed to be extensible and flexible. Each writer is
decoupled and can be create and config independently. User can:
+//! 1.Customize the writer using the writer trait.
+//! 2.Combine different writer to build a writer which have complex write
logic.
+//!
+//! There are two kinds of writer:
+//! 1. FileWriter: Focus on writing record batch to different physical file
format.(Such as parquet. orc)
+//! 2. IcebergWriter: Focus on the logical format of iceberg table. It will
write the data using the FileWriter finally.
+//!
+//! # Simple example for data file writer:
+//! ```ignore
+//! // Create a parquet file writer builder. The parameter can get from table.
+//! let file_writer_builder = ParquetWriterBuilder::new(
+//! 0,
+//! WriterProperties::builder().build(),
+//! schema,
+//! file_io.clone(),
+//! loccation_gen,
+//! file_name_gen,
+//! )
+//! // Create a data file writer using parquet file writer builder.
+//! let data_file_builder = DataFileBuilder::new(file_writer_builder);
+//! // Build the data file writer.
+//! let data_file_writer = data_file_builder.build().await.unwrap();
+//!
+//! data_file_writer.write(&record_batch).await.unwrap();
+//! let data_files = data_file_writer.flush().await.unwrap();
+//! ```
+pub mod base_writer;
pub mod file_writer;
-type DefaultOutput = Vec<DataFileBuilder>;
+use crate::{spec::DataFile, Result};
+use arrow_array::RecordBatch;
+
+type DefaultInput = RecordBatch;
+type DefaultOutput = Vec<DataFile>;
+
+/// The builder for iceberg writer.
+#[async_trait::async_trait]
+pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
+ Send + Clone + 'static
+{
+ /// The associated writer type.
+ type R: IcebergWriter<I, O>;
+ /// The associated writer config type used to build the writer.
+ type C;
+ /// Build the iceberg writer.
+ async fn build(self, config: Self::C) -> Result<Self::R>;
+}
+
+/// The iceberg writer used to write data to iceberg table.
+#[async_trait::async_trait]
+pub trait IcebergWriter<I = DefaultInput, O = DefaultOutput>: Send + 'static {
+ /// Write data to iceberg table.
+ async fn write(&mut self, input: I) -> Result<()>;
+ /// Close the writer and return the written data files.
+ /// If close failed, the data written before maybe be lost. User may need
to recreate the writer and rewrite the data again.
+ /// # NOTE
+ /// After close, no matter successfully or fail,the writer should never be
used again, otherwise the writer will panic.
+ async fn close(&mut self) -> Result<O>;
+}
/// The current file status of iceberg writer. It implement for the writer
which write a single
/// file.
@@ -33,3 +89,107 @@ pub trait CurrentFileStatus {
/// Get the current file written size.
fn current_written_size(&self) -> usize;
}
+
+#[cfg(test)]
+mod tests {
+ use arrow_array::RecordBatch;
+ use arrow_schema::Schema;
+ use arrow_select::concat::concat_batches;
+ use bytes::Bytes;
+ use futures::AsyncReadExt;
+ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+
+ use crate::{
+ io::FileIO,
+ spec::{DataFile, DataFileFormat},
+ };
+
+ use super::IcebergWriter;
+
+ // This function is used to guarantee the trait can be used as a object
safe trait.
+ async fn _guarantee_object_safe(mut w: Box<dyn IcebergWriter>) {
+ let _ = w
+ .write(RecordBatch::new_empty(Schema::empty().into()))
+ .await;
+ let _ = w.close().await;
+ }
+
+ // This function check:
+ // The data of the written parquet file is correct.
+ // The metadata of the data file is consistent with the written parquet
file.
+ pub(crate) async fn check_parquet_data_file(
+ file_io: &FileIO,
+ data_file: &DataFile,
+ batch: &RecordBatch,
+ ) {
+ assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+
+ // read the written file
+ let mut input_file = file_io
+ .new_input(data_file.file_path.clone())
+ .unwrap()
+ .reader()
+ .await
+ .unwrap();
+ let mut res = vec![];
+ let file_size = input_file.read_to_end(&mut res).await.unwrap();
+ let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+ let metadata = reader_builder.metadata().clone();
+
+ // check data
+ let reader = reader_builder.build().unwrap();
+ let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
+ let res = concat_batches(&batch.schema(), &batches).unwrap();
+ assert_eq!(*batch, res);
+
+ // check metadata
+ let expect_column_num = batch.num_columns();
+
+ assert_eq!(
+ data_file.record_count,
+ metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.num_rows())
+ .sum::<i64>() as u64
+ );
+
+ assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+
+ assert_eq!(data_file.column_sizes.len(), expect_column_num);
+ data_file.column_sizes.iter().for_each(|(&k, &v)| {
+ let expect = metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.column(k as usize).compressed_size())
+ .sum::<i64>() as u64;
+ assert_eq!(v, expect);
+ });
+
+ assert_eq!(data_file.value_counts.len(), expect_column_num);
+ data_file.value_counts.iter().for_each(|(_, &v)| {
+ let expect = metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.num_rows())
+ .sum::<i64>() as u64;
+ assert_eq!(v, expect);
+ });
+
+ assert_eq!(data_file.null_value_counts.len(), expect_column_num);
+ data_file.null_value_counts.iter().for_each(|(&k, &v)| {
+ let expect = batch.column(k as usize).null_count() as u64;
+ assert_eq!(v, expect);
+ });
+
+ assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
+ data_file
+ .split_offsets
+ .iter()
+ .enumerate()
+ .for_each(|(i, &v)| {
+ let expect = metadata.row_groups()[i].file_offset().unwrap();
+ assert_eq!(v, expect);
+ });
+ }
+}