This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 15b2a7ae Revert "feat: Add equality delete writer (#372)" (#672)
15b2a7ae is described below
commit 15b2a7aeea577b84044bda43e37bd7bc07c4a82c
Author: Xuanwo <[email protected]>
AuthorDate: Tue Oct 15 21:57:10 2024 +0800
Revert "feat: Add equality delete writer (#372)" (#672)
---
.../writer/base_writer/equality_delete_writer.rs | 589 ---------------------
crates/iceberg/src/writer/base_writer/mod.rs | 1 -
2 files changed, 590 deletions(-)
diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
deleted file mode 100644
index ba198821..00000000
--- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
+++ /dev/null
@@ -1,589 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This module provide `EqualityDeleteWriter`.
-
-use arrow_array::{ArrayRef, RecordBatch, StructArray};
-use arrow_schema::{DataType, FieldRef, Fields, Schema, SchemaRef};
-use itertools::Itertools;
-
-use crate::spec::{DataFile, Struct};
-use crate::writer::file_writer::FileWriter;
-use crate::writer::{file_writer::FileWriterBuilder, IcebergWriter,
IcebergWriterBuilder};
-use crate::{Error, ErrorKind, Result};
-
-/// Builder for `EqualityDeleteWriter`.
-#[derive(Clone)]
-pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
- inner: B,
-}
-
-impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
- /// Create a new `EqualityDeleteFileWriterBuilder` using a
`FileWriterBuilder`.
- pub fn new(inner: B) -> Self {
- Self { inner }
- }
-}
-
-/// Config for `EqualityDeleteWriter`.
-pub struct EqualityDeleteWriterConfig {
- equality_ids: Vec<usize>,
- projector: FieldProjector,
- schema: SchemaRef,
- partition_value: Struct,
-}
-
-impl EqualityDeleteWriterConfig {
- /// Create a new `DataFileWriterConfig` with equality ids.
- pub fn new(
- equality_ids: Vec<usize>,
- projector: FieldProjector,
- schema: Schema,
- partition_value: Option<Struct>,
- ) -> Self {
- Self {
- equality_ids,
- projector,
- schema: schema.into(),
- partition_value: partition_value.unwrap_or(Struct::empty()),
- }
- }
-}
-
-#[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for
EqualityDeleteFileWriterBuilder<B> {
- type R = EqualityDeleteFileWriter<B>;
- type C = EqualityDeleteWriterConfig;
-
- async fn build(self, config: Self::C) -> Result<Self::R> {
- Ok(EqualityDeleteFileWriter {
- inner_writer: Some(self.inner.clone().build().await?),
- projector: config.projector,
- delete_schema_ref: config.schema,
- equality_ids: config.equality_ids,
- partition_value: config.partition_value,
- })
- }
-}
-
-/// A writer write data
-pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
- inner_writer: Option<B::R>,
- projector: FieldProjector,
- delete_schema_ref: SchemaRef,
- equality_ids: Vec<usize>,
- partition_value: Struct,
-}
-
-impl<B: FileWriterBuilder> EqualityDeleteFileWriter<B> {
- fn project_record_batch_columns(&self, batch: RecordBatch) ->
Result<RecordBatch> {
- RecordBatch::try_new(
- self.delete_schema_ref.clone(),
- self.projector.project(batch.columns())?,
- )
- .map_err(|err| Error::new(ErrorKind::DataInvalid, format!("{err}")))
- }
-}
-
-#[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
- async fn write(&mut self, batch: RecordBatch) -> Result<()> {
- let batch = self.project_record_batch_columns(batch)?;
- if let Some(writer) = self.inner_writer.as_mut() {
- writer.write(&batch).await
- } else {
- Err(Error::new(
- ErrorKind::Unexpected,
- "Equality delete inner writer does not exist",
- ))
- }
- }
-
- async fn close(&mut self) -> Result<Vec<DataFile>> {
- if let Some(writer) = self.inner_writer.take() {
- Ok(writer
- .close()
- .await?
- .into_iter()
- .map(|mut res| {
- res.content(crate::spec::DataContentType::EqualityDeletes);
- res.equality_ids(self.equality_ids.iter().map(|id| *id as
i32).collect_vec());
- res.partition(self.partition_value.clone());
- res.build().expect("msg")
- })
- .collect_vec())
- } else {
- Err(Error::new(
- ErrorKind::Unexpected,
- "Equality delete inner writer does not exist",
- ))
- }
- }
-}
-
-/// Help to project specific field from `RecordBatch`` according to the column
id.
-pub struct FieldProjector {
- index_vec_vec: Vec<Vec<usize>>,
-}
-
-impl FieldProjector {
- /// Init FieldProjector
- pub fn new(
- batch_fields: &Fields,
- column_ids: &[usize],
- column_id_meta_key: &str,
- ) -> Result<(Self, Fields)> {
- let mut index_vec_vec = Vec::with_capacity(column_ids.len());
- let mut fields = Vec::with_capacity(column_ids.len());
- for &id in column_ids {
- let mut index_vec = vec![];
- if let Ok(field) = Self::fetch_column_index(
- batch_fields,
- &mut index_vec,
- id as i64,
- column_id_meta_key,
- ) {
- fields.push(field.clone());
- index_vec_vec.push(index_vec);
- } else {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Can't find source column id or column data type
invalid: {}",
- id
- ),
- ));
- }
- }
- Ok((Self { index_vec_vec }, Fields::from_iter(fields)))
- }
-
- fn fetch_column_index(
- fields: &Fields,
- index_vec: &mut Vec<usize>,
- col_id: i64,
- column_id_meta_key: &str,
- ) -> Result<FieldRef> {
- for (pos, field) in fields.iter().enumerate() {
- match field.data_type() {
- DataType::Float16 | DataType::Float32 | DataType::Float64 => {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Delete column data type cannot be float or double",
- ));
- }
- _ => {
- let id: i64 = field
- .metadata()
- .get(column_id_meta_key)
- .ok_or_else(|| Error::new(ErrorKind::DataInvalid,
"column_id must be set"))?
- .parse::<i64>()
- .map_err(|_| {
- Error::new(ErrorKind::DataInvalid, "column_id must
be parsable as i64")
- })?;
- if col_id == id {
- index_vec.push(pos);
- return Ok(field.clone());
- }
- if let DataType::Struct(inner) = field.data_type() {
- let res =
- Self::fetch_column_index(inner, index_vec, col_id,
column_id_meta_key);
- if !index_vec.is_empty() {
- index_vec.push(pos);
- return res;
- }
- }
- }
- }
- }
- Err(Error::new(
- ErrorKind::DataInvalid,
- "Column id not found in fields",
- ))
- }
- /// Do projection with batch
- pub fn project(&self, batch: &[ArrayRef]) -> Result<Vec<ArrayRef>> {
- self.index_vec_vec
- .iter()
- .map(|index_vec| Self::get_column_by_index_vec(batch, index_vec))
- .collect::<Result<Vec<_>>>()
- }
-
- fn get_column_by_index_vec(batch: &[ArrayRef], index_vec: &[usize]) ->
Result<ArrayRef> {
- let mut rev_iterator = index_vec.iter().rev();
- let mut array = batch[*rev_iterator.next().unwrap()].clone();
- for idx in rev_iterator {
- array = array
- .as_any()
- .downcast_ref::<StructArray>()
- .ok_or(Error::new(
- ErrorKind::Unexpected,
- "Cannot convert Array to StructArray",
- ))?
- .column(*idx)
- .clone();
- }
- Ok(array)
- }
-}
-
-#[cfg(test)]
-mod test {
- use arrow_select::concat::concat_batches;
- use itertools::Itertools;
- use std::{collections::HashMap, sync::Arc};
-
- use arrow_array::{types::Int64Type, ArrayRef, Int64Array, RecordBatch,
StructArray};
- use parquet::{
- arrow::{arrow_reader::ParquetRecordBatchReaderBuilder,
PARQUET_FIELD_ID_META_KEY},
- file::properties::WriterProperties,
- };
- use tempfile::TempDir;
-
- use crate::{
- io::{FileIO, FileIOBuilder},
- spec::{DataFile, DataFileFormat},
- writer::{
- base_writer::equality_delete_writer::{
- EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
FieldProjector,
- },
- file_writer::{
- location_generator::{test::MockLocationGenerator,
DefaultFileNameGenerator},
- ParquetWriterBuilder,
- },
- IcebergWriter, IcebergWriterBuilder,
- },
- };
-
- async fn check_parquet_data_file_with_equality_delete_write(
- file_io: &FileIO,
- data_file: &DataFile,
- batch: &RecordBatch,
- ) {
- assert_eq!(data_file.file_format, DataFileFormat::Parquet);
-
- // read the written file
- let input_file =
file_io.new_input(data_file.file_path.clone()).unwrap();
- // read the written file
- let input_content = input_file.read().await.unwrap();
- let reader_builder =
-
ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap();
- let metadata = reader_builder.metadata().clone();
-
- // check data
- let reader = reader_builder.build().unwrap();
- let batches = reader.map(|batch| batch.unwrap()).collect::<Vec<_>>();
- let res = concat_batches(&batch.schema(), &batches).unwrap();
- assert_eq!(*batch, res);
-
- // check metadata
- let expect_column_num = batch.num_columns();
-
- assert_eq!(
- data_file.record_count,
- metadata
- .row_groups()
- .iter()
- .map(|group| group.num_rows())
- .sum::<i64>() as u64
- );
-
- assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64);
-
- assert_eq!(data_file.column_sizes.len(), expect_column_num);
-
- for (index, id) in
data_file.column_sizes().keys().sorted().enumerate() {
- metadata
- .row_groups()
- .iter()
- .map(|group| group.columns())
- .for_each(|column| {
- assert_eq!(
- *data_file.column_sizes.get(id).unwrap() as i64,
- column.get(index).unwrap().compressed_size()
- );
- });
- }
-
- assert_eq!(data_file.value_counts.len(), expect_column_num);
- data_file.value_counts.iter().for_each(|(_, &v)| {
- let expect = metadata
- .row_groups()
- .iter()
- .map(|group| group.num_rows())
- .sum::<i64>() as u64;
- assert_eq!(v, expect);
- });
-
- for (index, id) in data_file.null_value_counts().keys().enumerate() {
- let expect = batch.column(index).null_count() as u64;
- assert_eq!(*data_file.null_value_counts.get(id).unwrap(), expect);
- }
-
- assert_eq!(data_file.split_offsets.len(), metadata.num_row_groups());
- data_file
- .split_offsets
- .iter()
- .enumerate()
- .for_each(|(i, &v)| {
- let expect = metadata.row_groups()[i].file_offset().unwrap();
- assert_eq!(v, expect);
- });
- }
-
- #[tokio::test]
- async fn test_equality_delete_writer() -> Result<(), anyhow::Error> {
- let temp_dir = TempDir::new().unwrap();
- let file_io = FileIOBuilder::new_fs_io().build().unwrap();
- let location_gen =
-
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
- let file_name_gen =
- DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
-
- // prepare data
- // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
- let schema = {
- let fields = vec![
- arrow_schema::Field::new("col0",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "0".to_string(),
- )])),
- arrow_schema::Field::new(
- "col1",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "1".to_string(),
- )])),
- arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8,
true).with_metadata(
- HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string())]),
- ),
- arrow_schema::Field::new(
- "col3",
- arrow_schema::DataType::List(Arc::new(
- arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )])),
- )),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "3".to_string(),
- )])),
- arrow_schema::Field::new(
- "col4",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "4".to_string(),
- )])),
- ];
- arrow_schema::Schema::new(fields)
- };
- let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
- let col1 = Arc::new(StructArray::new(
- vec![
- arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "5".to_string(),
- )])),
- ]
- .into(),
- vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
- None,
- ));
- let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
- "test";
- 1024
- ])) as ArrayRef;
- let col3 = Arc::new({
- let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
- Some(
- vec![Some(1),]
- );
- 1024
- ])
- .into_parts();
- arrow_array::ListArray::new(
-
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "6".to_string(),
- )]))),
- list_parts.1,
- list_parts.2,
- list_parts.3,
- )
- }) as ArrayRef;
- let col4 = Arc::new(StructArray::new(
- vec![arrow_schema::Field::new(
- "sub_col",
- arrow_schema::DataType::Struct(
- vec![arrow_schema::Field::new(
- "sub_sub_col",
- arrow_schema::DataType::Int64,
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )]))]
- .into(),
- ),
- true,
- )
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "8".to_string(),
- )]))]
- .into(),
- vec![Arc::new(StructArray::new(
- vec![
- arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "7".to_string(),
- )])),
- ]
- .into(),
- vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
- None,
- ))],
- None,
- ));
- let columns = vec![col0, col1, col2, col3, col4];
-
- let equality_ids = vec![1, 3];
- let (projector, fields) =
- FieldProjector::new(schema.fields(), &equality_ids,
PARQUET_FIELD_ID_META_KEY)?;
- let delete_schema = arrow_schema::Schema::new(fields);
- let delete_schema_ref = Arc::new(delete_schema.clone());
-
- // prepare writer
- let to_write = RecordBatch::try_new(Arc::new(schema.clone()),
columns).unwrap();
- let pb = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- delete_schema_ref.clone(),
- file_io.clone(),
- location_gen,
- file_name_gen,
- );
-
- let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(pb)
- .build(EqualityDeleteWriterConfig::new(
- equality_ids,
- projector,
- delete_schema.clone(),
- None,
- ))
- .await?;
- // write
- equality_delete_writer.write(to_write.clone()).await?;
- let res = equality_delete_writer.close().await?;
- assert_eq!(res.len(), 1);
- let data_file = res.into_iter().next().unwrap();
-
- // check
- let to_write_projected =
equality_delete_writer.project_record_batch_columns(to_write)?;
- check_parquet_data_file_with_equality_delete_write(
- &file_io,
- &data_file,
- &to_write_projected,
- )
- .await;
- Ok(())
- }
-
- #[tokio::test]
- async fn test_equality_delete_float_or_double_column() -> Result<(),
anyhow::Error> {
- // Float32, Float64
- let schema = {
- let fields = vec![
- arrow_schema::Field::new("col0",
arrow_schema::DataType::Float32, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "0".to_string(),
- )])),
- arrow_schema::Field::new("col1",
arrow_schema::DataType::Float64, true)
- .with_metadata(HashMap::from([(
- PARQUET_FIELD_ID_META_KEY.to_string(),
- "1".to_string(),
- )])),
- ];
- arrow_schema::Schema::new(fields)
- };
-
- let equality_id_float = vec![0];
- let result_float = FieldProjector::new(
- schema.fields(),
- &equality_id_float,
- PARQUET_FIELD_ID_META_KEY,
- );
- assert!(result_float.is_err());
-
- let equality_ids_double = vec![1];
- let result_double = FieldProjector::new(
- schema.fields(),
- &equality_ids_double,
- PARQUET_FIELD_ID_META_KEY,
- );
- assert!(result_double.is_err());
-
- Ok(())
- }
-}
diff --git a/crates/iceberg/src/writer/base_writer/mod.rs
b/crates/iceberg/src/writer/base_writer/mod.rs
index 37ab97eb..37da2ab8 100644
--- a/crates/iceberg/src/writer/base_writer/mod.rs
+++ b/crates/iceberg/src/writer/base_writer/mod.rs
@@ -18,4 +18,3 @@
//! Base writer module contains the basic writer provide by iceberg:
`DataFileWriter`, `PositionDeleteFileWriter`, `EqualityDeleteFileWriter`.
pub mod data_file_writer;
-pub mod equality_delete_writer;