This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new ad89eac0 feat: Add equality delete writer (#372)
ad89eac0 is described below
commit ad89eac02712ceac2c3cff6bf0fe5d1b6e289a26
Author: Yue Deng <[email protected]>
AuthorDate: Tue Oct 15 11:47:11 2024 +0800
feat: Add equality delete writer (#372)
* feat: add EqualityDeleteWriter
* WIP: add test cases
* fix: move delete schema out of writer
* test: add test case for equality delete writer
* fix: refactor projector
* fix: fix projector
* fix: add result
* test: add float and double column test for equality delete writer
* fmt
* fix: compatibility with #364
* fix: remove unwrap
* fix: minor
---
.../writer/base_writer/equality_delete_writer.rs | 589 +++++++++++++++++++++
crates/iceberg/src/writer/base_writer/mod.rs | 1 +
2 files changed, 590 insertions(+)
diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
new file mode 100644
index 00000000..ba198821
--- /dev/null
+++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
@@ -0,0 +1,589 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provide `EqualityDeleteWriter`.
+
+use arrow_array::{ArrayRef, RecordBatch, StructArray};
+use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef};
+use itertools::Itertools;
+
+use crate::spec::{DataFile, Struct};
+use crate::writer::file_writer::FileWriter;
+use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter,
IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
+
+/// Builder for `EqualityDeleteWriter`.
+#[derive(Clone)]
+pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
+ inner: B,
+}
+
+impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
+ /// Create a new `EqualityDeleteFileWriterBuilder` using a
`FileWriterBuilder`.
+ pub fn new(inner: B) -> Self {
+ Self { inner }
+ }
+}
+
+/// Config for `EqualityDeleteWriter`.
+pub struct EqualityDeleteWriterConfig {
+ equality_ids: Vec<usize>,
+ projector: FieldProjector,
+ schema: SchemaRef,
+ partition_value: Struct,
+}
+
+impl EqualityDeleteWriterConfig {
+ /// Create a new `DataFileWriterConfig` with equality ids.
+ pub fn new(
+ equality_ids: Vec<usize>,
+ projector: FieldProjector,
+ schema: Schema,
+ partition_value: Option<Struct>,
+ ) -> Self {
+ Self {
+ equality_ids,
+ projector,
+ schema: schema.into(),
+ partition_value: partition_value.unwrap_or(Struct::empty()),
+ }
+ }
+}
+
+#[async_trait::async_trait]
+impl<B: FileWriterBuilder> IcebergWriterBuilder for
EqualityDeleteFileWriterBuilder<B> {
+ type R = EqualityDeleteFileWriter<B>;
+ type C = EqualityDeleteWriterConfig;
+
+ async fn build(self, config: Self::C) -> Result<Self::R> {
+ Ok(EqualityDeleteFileWriter {
+ inner_writer: Some(self.inner.clone().build().await?),
+ projector: config.projector,
+ delete_schema_ref: config.schema,
+ equality_ids: config.equality_ids,
+ partition_value: config.partition_value,
+ })
+ }
+}
+
+/// A writer write data
+pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
+ inner_writer: Option<B::R>,
+ projector: FieldProjector,
+ delete_schema_ref: SchemaRef,
+ equality_ids: Vec<usize>,
+ partition_value: Struct,
+}
+
+impl<B: FileWriterBuilder> EqualityDeleteFileWriter<B> {
+ fn project_record_batch_columns(&self, batch: RecordBatch) ->
Result<RecordBatch> {
+ RecordBatch::try_new(
+ self.delete_schema_ref.clone(),
+ self.projector.project(batch.columns())?,
+ )
+ .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))
+ }
+}
+
+#[async_trait::async_trait]
+impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
+ async fn write(&mut self, batch: RecordBatch) -> Result<()> {
+ let batch = self.project_record_batch_columns(batch)?;
+ if let Some(writer) = self.inner_writer.as_mut() {
+ writer.write(&batch).await
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "Equality delete inner writer does not exist",
+ ))
+ }
+ }
+
+ async fn close(&mut self) -> Result<Vec<DataFile>> {
+ if let Some(writer) = self.inner_writer.take() {
+ Ok(writer
+ .close()
+ .await?
+ .into_iter()
+ .map(|mut res| {
+ res.content(crate::spec::DataContentType::EqualityDeletes);
+ res.equality_ids(self.equality_ids.iter().map(|id| *id as
i32).collect_vec());
+ res.partition(self.partition_value.clone());
+ res.build().expect("msg")
+ })
+ .collect_vec())
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "Equality delete inner writer does not exist",
+ ))
+ }
+ }
+}
+
+/// Help to project specific field from `RecordBatch`` according to the column
id.
+pub struct FieldProjector {
+ index_vec_vec: Vec<Vec<usize>>,
+}
+
+impl FieldProjector {
+ /// Init FieldProjector
+ pub fn new(
+ batch_fields: &Fields,
+ column_ids: &[usize],
+ column_id_meta_key: &str,
+ ) -> Result<(Self, Fields)> {
+ let mut index_vec_vec = Vec::with_capacity(column_ids.len());
+ let mut fields = Vec::with_capacity(column_ids.len());
+ for &id in column_ids {
+ let mut index_vec = vec![];
+ if let Ok(field) = Self::fetch_column_index(
+ batch_fields,
+ &mut index_vec,
+ id as i64,
+ column_id_meta_key,
+ ) {
+ fields.push(field.clone());
+ index_vec_vec.push(index_vec);
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Can't find source column id or column data type
invalid: {}",
+ id
+ ),
+ ));
+ }
+ }
+ Ok((Self { index_vec_vec }, Fields::from_iter(fields)))
+ }
+
+ fn fetch_column_index(
+ fields: &Fields,
+ index_vec: &mut Vec<usize>,
+ col_id: i64,
+ column_id_meta_key: &str,
+ ) -> Result<FieldRef> {
+ for (pos, field) in fields.iter().enumerate() {
+ match field.data_type() {
+ DataType::Float16 | DataType::Float32 | DataType::Float64 => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Delete column data type cannot be float or double",
+ ));
+ }
+ _ => {
+ let id: i64 = field
+ .metadata()
+ .get(column_id_meta_key)
+ .ok_or_else(|| Error::new(ErrorKind::DataInvalid,
"column_id must be set"))?
+ .parse::<i64>()
+ .map_err(|_| {
+ Error::new(ErrorKind::DataInvalid, "column_id must
be parsable as i64")
+ })?;
+ if col_id == id {
+ index_vec.push(pos);
+ return Ok(field.clone());
+ }
+ if let DataType::Struct(inner) = field.data_type() {
+ let res =
+ Self::fetch_column_index(inner, index_vec, col_id,
column_id_meta_key);
+ if !index_vec.is_empty() {
+ index_vec.push(pos);
+ return res;
+ }
+ }
+ }
+ }
+ }
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Column id not found in fields",
+ ))
+ }
+ /// Do projection with batch
+ pub fn project(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
+ self.index_vec_vec
+ .iter()
+ .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec))
+ .collect::<Result<Vec<_>>>()
+ }
+
+ fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) ->
Result<ArrayRef> {
+ let mut rev_iterator = index_vec.iter().rev();
+ let mut array = batch[*rev_iterator.next().unwrap()].clone();
+ for idx in rev_iterator {
+ array = array
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .ok_or(Error::new(
+ ErrorKind::Unexpected,
+ "Cannot convert Array to StructArray",
+ ))?
+ .column(*idx)
+ .clone();
+ }
+ Ok(array)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use arrow_select::concat::concat_batches;
+ use itertools::Itertools;
+ use std::{collections::HashMap, sync::Arc};
+
+ use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch,
StructArray};
+ use parquet::{
+ arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
PARQUET_FIELD_ID_META_KEY},
+ file::properties::WriterProperties,
+ };
+ use tempfile::TempDir;
+
+ use crate::{
+ io::{FileIO, FileIOBuilder},
+ spec::{DataFile, DataFileFormat},
+ writer::{
+ base_writer::equality_delete_writer::{
+ EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
FieldProjector,
+ },
+ file_writer::{
+ location_generator::{test::MockLocationGenerator,
DefaultFileNameGenerator},
+ ParquetWriterBuilder,
+ },
+ IcebergWriter, IcebergWriterBuilder,
+ },
+ };
+
+ async fn check_parquet_data_file_with_equality_delete_write(
+ file_io: &FileIO,
+ data_file: &DataFile,
+ batch: &RecordBatch,
+ ) {
+ assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+
+ // read the written file
+ let input_file =
file_io.new_input(data_file.file_path.clone()).unwrap();
+ // read the written file
+ let input_content = input_file.read().await.unwrap();
+ let reader_builder =
+
ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
+ let metadata = reader_builder.metadata().clone();
+
+ // check data
+ let reader = reader_builder.build().unwrap();
+ let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
+ let res = concat_batches(&batch.schema(), &batches).unwrap();
+ assert_eq!(*batch, res);
+
+ // check metadata
+ let expect_column_num = batch.num_columns();
+
+ assert_eq!(
+ data_file.record_count,
+ metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.num_rows())
+ .sum::<i64>() as u64
+ );
+
+ assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
+
+ assert_eq!(data_file.column_sizes.len(), expect_column_num);
+
+ for (index, id) in
data_file.column_sizes().keys().sorted().enumerate() {
+ metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.columns())
+ .for_each(|column| {
+ assert_eq!(
+ *data_file.column_sizes.get(id).unwrap() as i64,
+ column.get(index).unwrap().compressed_size()
+ );
+ });
+ }
+
+ assert_eq!(data_file.value_counts.len(), expect_column_num);
+ data_file.value_counts.iter().for_each(|(_, &v)| {
+ let expect = metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.num_rows())
+ .sum::<i64>() as u64;
+ assert_eq!(v, expect);
+ });
+
+ for (index, id) in data_file.null_value_counts().keys().enumerate() {
+ let expect = batch.column(index).null_count() as u64;
+ assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect);
+ }
+
+ assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
+ data_file
+ .split_offsets
+ .iter()
+ .enumerate()
+ .for_each(|(i, &v)| {
+ let expect = metadata.row_groups()[i].file_offset().unwrap();
+ assert_eq!(v, expect);
+ });
+ }
+
+ #[tokio::test]
+ async fn test_equality_delete_writer() -> Result<(), anyhow::Error> {
+ let temp_dir = TempDir::new().unwrap();
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let location_gen =
+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+ let file_name_gen =
+ DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
+
+ // prepare data
+ // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
+ let schema = {
+ let fields = vec![
+ arrow_schema::Field::new("col0",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "0".to_string(),
+ )])),
+ arrow_schema::Field::new(
+ "col1",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8,
true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string())]),
+ ),
+ arrow_schema::Field::new(
+ "col3",
+ arrow_schema::DataType::List(Arc::new(
+ arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )])),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ arrow_schema::Field::new(
+ "col4",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "4".to_string(),
+ )])),
+ ];
+ arrow_schema::Schema::new(fields)
+ };
+ let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let col1 = Arc::new(StructArray::new(
+ vec![
+ arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )])),
+ ]
+ .into(),
+ vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ None,
+ ));
+ let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
+ "test";
+ 1024
+ ])) as ArrayRef;
+ let col3 = Arc::new({
+ let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+ Some(
+ vec![Some(1),]
+ );
+ 1024
+ ])
+ .into_parts();
+ arrow_array::ListArray::new(
+
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )]))),
+ list_parts.1,
+ list_parts.2,
+ list_parts.3,
+ )
+ }) as ArrayRef;
+ let col4 = Arc::new(StructArray::new(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )]))]
+ .into(),
+ vec![Arc::new(StructArray::new(
+ vec![
+ arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )])),
+ ]
+ .into(),
+ vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ None,
+ ))],
+ None,
+ ));
+ let columns = vec![col0, col1, col2, col3, col4];
+
+ let equality_ids = vec![1, 3];
+ let (projector, fields) =
+ FieldProjector::new(schema.fields(), &equality_ids,
PARQUET_FIELD_ID_META_KEY)?;
+ let delete_schema = arrow_schema::Schema::new(fields);
+ let delete_schema_ref = Arc::new(delete_schema.clone());
+
+ // prepare writer
+ let to_write = RecordBatch::try_new(Arc::new(schema.clone()),
columns).unwrap();
+ let pb = ParquetWriterBuilder::new(
+ WriterProperties::builder().build(),
+ delete_schema_ref.clone(),
+ file_io.clone(),
+ location_gen,
+ file_name_gen,
+ );
+
+ let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(pb)
+ .build(EqualityDeleteWriterConfig::new(
+ equality_ids,
+ projector,
+ delete_schema.clone(),
+ None,
+ ))
+ .await?;
+ // write
+ equality_delete_writer.write(to_write.clone()).await?;
+ let res = equality_delete_writer.close().await?;
+ assert_eq!(res.len(), 1);
+ let data_file = res.into_iter().next().unwrap();
+
+ // check
+ let to_write_projected =
equality_delete_writer.project_record_batch_columns(to_write)?;
+ check_parquet_data_file_with_equality_delete_write(
+ &file_io,
+ &data_file,
+ &to_write_projected,
+ )
+ .await;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_equality_delete_float_or_double_column() -> Result<(),
anyhow::Error> {
+ // Float32, Float64
+ let schema = {
+ let fields = vec![
+ arrow_schema::Field::new("col0",
arrow_schema::DataType::Float32, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "0".to_string(),
+ )])),
+ arrow_schema::Field::new("col1",
arrow_schema::DataType::Float64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ];
+ arrow_schema::Schema::new(fields)
+ };
+
+ let equality_id_float = vec![0];
+ let result_float = FieldProjector::new(
+ schema.fields(),
+ &equality_id_float,
+ PARQUET_FIELD_ID_META_KEY,
+ );
+ assert!(result_float.is_err());
+
+ let equality_ids_double = vec![1];
+ let result_double = FieldProjector::new(
+ schema.fields(),
+ &equality_ids_double,
+ PARQUET_FIELD_ID_META_KEY,
+ );
+ assert!(result_double.is_err());
+
+ Ok(())
+ }
+}
diff --git a/crates/iceberg/src/writer/base_writer/mod.rs
b/crates/iceberg/src/writer/base_writer/mod.rs
index 37da2ab8..37ab97eb 100644
--- a/crates/iceberg/src/writer/base_writer/mod.rs
+++ b/crates/iceberg/src/writer/base_writer/mod.rs
@@ -18,3 +18,4 @@
//! Base writer module contains the basic writer provide by iceberg:
`DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
pub mod data_file_writer;
+pub mod equality_delete_writer;