This is an automated email from the ASF dual-hosted git repository. liurenjie1024 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push: new 2a3909f9 Scan Delete Support Part 6: Equality Delete Parsing (#1017) 2a3909f9 is described below commit 2a3909f98aa4d30e7df462f04096b61660ae5f5b Author: Scott Donnelly <sc...@donnel.ly> AuthorDate: Tue Sep 9 02:25:17 2025 +0100 Scan Delete Support Part 6: Equality Delete Parsing (#1017) Concludes the series of scan delete file support PRs. * Adds parsing of equality delete files to `DeleteFileLoader` Issue: https://github.com/apache/iceberg-rust/issues/630 --------- Co-authored-by: Renjie Liu <liurenjie2...@gmail.com> --- .../src/arrow/caching_delete_file_loader.rs | 347 ++++++++++++++++++++- crates/iceberg/src/arrow/value.rs | 17 +- crates/iceberg/src/scan/context.rs | 1 - 3 files changed, 352 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index f0dece75..c6a8943d 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -15,19 +15,27 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::ops::Not; +use std::sync::Arc; -use arrow_array::{Int64Array, StringArray}; +use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use futures::{StreamExt, TryStreamExt}; use tokio::sync::oneshot::{Receiver, channel}; use super::delete_filter::DeleteFilter; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; +use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema}; use crate::delete_vector::DeleteVector; -use crate::expr::Predicate; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Predicate, Reference}; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; -use crate::spec::{DataContentType, SchemaRef}; +use crate::spec::{ + DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, + PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, + visit_schema_with_partner, +}; use crate::{Error, ErrorKind, Result}; #[derive(Clone, Debug)] @@ -43,6 +51,7 @@ enum DeleteFileContext { PosDels(ArrowRecordBatchStream), FreshEqDel { batch_stream: ArrowRecordBatchStream, + equality_ids: HashSet<i32>, sender: tokio::sync::oneshot::Sender<Predicate>, }, } @@ -224,6 +233,7 @@ impl CachingDeleteFileLoader { ) .await?, sender, + equality_ids: HashSet::from_iter(task.equality_ids.clone()), }) } @@ -247,9 +257,11 @@ impl CachingDeleteFileLoader { DeleteFileContext::FreshEqDel { sender, batch_stream, + equality_ids, } => { let predicate = - Self::parse_equality_deletes_record_batch_stream(batch_stream).await?; + Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids) + .await?; sender .send(predicate) @@ -308,28 +320,341 @@ impl CachingDeleteFileLoader { Ok(result) } - /// Parses record batch streams from individual equality delete files - /// - /// Returns an unbound Predicate for each batch stream async fn parse_equality_deletes_record_batch_stream( - streams: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, + equality_ids: HashSet<i32>, ) -> Result<Predicate> { - // TODO + let mut result_predicate = AlwaysTrue; + let mut batch_schema_iceberg: Option<Schema> = None; + let accessor = EqDelRecordBatchPartnerAccessor; + + while let Some(record_batch) = stream.next().await { + let record_batch = record_batch?; + + if record_batch.num_columns() == 0 { + return Ok(AlwaysTrue); + } + + let schema = match &batch_schema_iceberg { + Some(schema) => schema, + None => { + let schema = arrow_schema_to_schema(record_batch.schema().as_ref())?; + batch_schema_iceberg = Some(schema); + batch_schema_iceberg.as_ref().unwrap() + } + }; + + let root_array: ArrayRef = Arc::new(StructArray::from(record_batch)); + + let mut processor = EqDelColumnProcessor::new(&equality_ids); + visit_schema_with_partner(schema, &root_array, &mut processor, &accessor)?; + + let mut datum_columns_with_names = processor.finish()?; + if datum_columns_with_names.is_empty() { + continue; + } + + // Process the collected columns in lockstep + #[allow(clippy::len_zero)] + while datum_columns_with_names[0].0.len() > 0 { + let mut row_predicate = AlwaysTrue; + for &mut (ref mut column, ref field_name) in &mut datum_columns_with_names { + if let Some(item) = column.next() { + let cell_predicate = if let Some(datum) = item? { + Reference::new(field_name.clone()).equal_to(datum.clone()) + } else { + Reference::new(field_name.clone()).is_null() + }; + row_predicate = row_predicate.and(cell_predicate) + } + } + result_predicate = result_predicate.and(row_predicate.not()); + } + } + Ok(result_predicate.rewrite_not()) + } +} + +struct EqDelColumnProcessor<'a> { + equality_ids: &'a HashSet<i32>, + collected_columns: Vec<(ArrayRef, String, Type)>, +} + +impl<'a> EqDelColumnProcessor<'a> { + fn new(equality_ids: &'a HashSet<i32>) -> Self { + Self { + equality_ids, + collected_columns: Vec::with_capacity(equality_ids.len()), + } + } + + #[allow(clippy::type_complexity)] + fn finish( + self, + ) -> Result< + Vec<( + Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>>, + String, + )>, + > { + self.collected_columns + .into_iter() + .map(|(array, field_name, field_type)| { + let primitive_type = field_type + .as_primitive_type() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "field is not a primitive type") + })? + .clone(); + + let lit_vec = arrow_primitive_to_literal(&array, &field_type)?; + let datum_iterator: Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>> = + Box::new(lit_vec.into_iter().map(move |c| { + c.map(|literal| { + literal + .as_primitive_literal() + .map(|primitive_literal| { + Datum::new(primitive_type.clone(), primitive_literal) + }) + .ok_or(Error::new( + ErrorKind::Unexpected, + "failed to convert to primitive literal", + )) + }) + .transpose() + })); + + Ok((datum_iterator, field_name)) + }) + .collect::<Result<Vec<_>>>() + } +} + +impl SchemaWithPartnerVisitor<ArrayRef> for EqDelColumnProcessor<'_> { + type T = (); + + fn schema(&mut self, _schema: &Schema, _partner: &ArrayRef, _value: ()) -> Result<()> { + Ok(()) + } + + fn field(&mut self, field: &NestedFieldRef, partner: &ArrayRef, _value: ()) -> Result<()> { + if self.equality_ids.contains(&field.id) && field.field_type.as_primitive_type().is_some() { + self.collected_columns.push(( + partner.clone(), + field.name.clone(), + field.field_type.as_ref().clone(), + )); + } + Ok(()) + } + + fn r#struct( + &mut self, + _struct: &StructType, + _partner: &ArrayRef, + _results: Vec<()>, + ) -> Result<()> { + Ok(()) + } + + fn list(&mut self, _list: &ListType, _partner: &ArrayRef, _value: ()) -> Result<()> { + Ok(()) + } + + fn map( + &mut self, + _map: &MapType, + _partner: &ArrayRef, + _key_value: (), + _value: (), + ) -> Result<()> { + Ok(()) + } + fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) -> Result<()> { + Ok(()) + } +} + +struct EqDelRecordBatchPartnerAccessor; + +impl PartnerAccessor<ArrayRef> for EqDelRecordBatchPartnerAccessor { + fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Ok(schema_partner) + } + + fn field_partner<'a>( + &self, + struct_partner: &'a ArrayRef, + field: &NestedField, + ) -> Result<&'a ArrayRef> { + let Some(struct_array) = struct_partner.as_any().downcast_ref::<StructArray>() else { + return Err(Error::new( + ErrorKind::Unexpected, + "Expected struct array for field extraction", + )); + }; + + // Find the field by name within the struct + for (i, field_def) in struct_array.fields().iter().enumerate() { + if field_def.name() == &field.name { + return Ok(struct_array.column(i)); + } + } + + Err(Error::new( + ErrorKind::Unexpected, + format!("Field {} not found in parent struct", field.name), + )) + } + + fn list_element_partner<'a>(&self, _list_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "List columns are unsupported in equality deletes", + )) + } + + fn map_key_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { Err(Error::new( ErrorKind::FeatureUnsupported, - "parsing of equality deletes is not yet supported", + "Map columns are unsupported in equality deletes", + )) + } + + fn map_value_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a ArrayRef> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Map columns are unsupported in equality deletes", )) } } #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs::File; + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Fields}; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; use tempfile::TempDir; use super::*; use crate::arrow::delete_filter::tests::setup; + #[tokio::test] + async fn test_delete_file_loader_parse_equality_deletes() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().as_os_str().to_str().unwrap(); + let file_io = FileIO::from_path(table_location).unwrap().build().unwrap(); + + let eq_delete_file_path = setup_write_equality_delete_file_1(table_location); + + let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); + let record_batch_stream = basic_delete_file_loader + .parquet_to_batch_stream(&eq_delete_file_path) + .await + .expect("could not get batch stream"); + + let eq_ids = HashSet::from_iter(vec![2, 3, 4, 6]); + + let parsed_eq_delete = CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream( + record_batch_stream, + eq_ids, + ) + .await + .expect("error parsing batch stream"); + println!("{}", parsed_eq_delete); + + let expected = "((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa != 4)) AND ((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 5))".to_string(); + + assert_eq!(parsed_eq_delete.to_string(), expected); + } + + /// Create a simple field with metadata. + fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> Field { + arrow_schema::Field::new(name, ty, nullable).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + value.to_string(), + )])) + } + + fn setup_write_equality_delete_file_1(table_location: &str) -> String { + let col_y_vals = vec![1, 2]; + let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef; + + let col_z_vals = vec![Some(100), None]; + let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef; + + let col_a_vals = vec![Some("HELP"), None]; + let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef; + + let col_s = Arc::new(StructArray::from(vec![ + ( + Arc::new(simple_field("sa", DataType::Int32, false, "6")), + Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef, + ), + ( + Arc::new(simple_field("sb", DataType::Utf8, true, "7")), + Arc::new(StringArray::from(vec![Some("x"), None])) as ArrayRef, + ), + ])); + + let equality_delete_schema = { + let struct_field = DataType::Struct(Fields::from(vec![ + simple_field("sa", DataType::Int32, false, "6"), + simple_field("sb", DataType::Utf8, true, "7"), + ])); + + let fields = vec![ + Field::new("y", arrow_schema::DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())], + )), + Field::new("z", arrow_schema::DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())], + )), + Field::new("a", arrow_schema::DataType::Utf8, true).with_metadata(HashMap::from([ + (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()), + ])), + simple_field("s", struct_field, false, "5"), + ]; + Arc::new(arrow_schema::Schema::new(fields)) + }; + + let equality_deletes_to_write = RecordBatch::try_new(equality_delete_schema.clone(), vec![ + col_y, col_z, col_a, col_s, + ]) + .unwrap(); + + let path = format!("{}/equality-deletes-1.parquet", &table_location); + + let file = File::create(&path).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let mut writer = ArrowWriter::try_new( + file, + equality_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&equality_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + + path + } + #[tokio::test] async fn test_caching_delete_file_loader_load_deletes() { let tmp_dir = TempDir::new().unwrap(); diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 9ddd941f..cc3a561d 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -27,7 +27,8 @@ use uuid::Uuid; use super::get_field_id; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveType, - SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner, + SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, + visit_type_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -604,6 +605,20 @@ pub fn arrow_struct_to_literal( ) } +/// Convert arrow primitive array to iceberg primitive value array. +/// This function will assume the schema of arrow struct array is the same as iceberg struct type. +pub fn arrow_primitive_to_literal( + primitive_array: &ArrayRef, + ty: &Type, +) -> Result<Vec<Option<Literal>>> { + visit_type_with_partner( + ty, + primitive_array, + &mut ArrowArrayToIcebergStructConverter, + &ArrowArrayAccessor::new(), + ) +} + #[cfg(test)] mod test { use std::collections::HashMap; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 1e4ef41b..3f7c29db 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -191,7 +191,6 @@ impl PlanContext { // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; - for manifest_file in manifest_files { let tx = if manifest_file.content == ManifestContentType::Deletes { delete_file_tx.clone()