This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a3909f9 Scan Delete Support Part 6: Equality Delete Parsing (#1017)
2a3909f9 is described below

commit 2a3909f98aa4d30e7df462f04096b61660ae5f5b
Author: Scott Donnelly <sc...@donnel.ly>
AuthorDate: Tue Sep 9 02:25:17 2025 +0100

    Scan Delete Support Part 6: Equality Delete Parsing (#1017)
    
    Concludes the series of scan delete file support PRs.
    
    * Adds parsing of equality delete files to `DeleteFileLoader`
    
    Issue: https://github.com/apache/iceberg-rust/issues/630
    
    ---------
    
    Co-authored-by: Renjie Liu <liurenjie2...@gmail.com>
---
 .../src/arrow/caching_delete_file_loader.rs        | 347 ++++++++++++++++++++-
 crates/iceberg/src/arrow/value.rs                  |  17 +-
 crates/iceberg/src/scan/context.rs                 |   1 -
 3 files changed, 352 insertions(+), 13 deletions(-)

diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs 
b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
index f0dece75..c6a8943d 100644
--- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs
+++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs
@@ -15,19 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
+use std::ops::Not;
+use std::sync::Arc;
 
-use arrow_array::{Int64Array, StringArray};
+use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
 use futures::{StreamExt, TryStreamExt};
 use tokio::sync::oneshot::{Receiver, channel};
 
 use super::delete_filter::DeleteFilter;
 use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
+use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
 use crate::delete_vector::DeleteVector;
-use crate::expr::Predicate;
+use crate::expr::Predicate::AlwaysTrue;
+use crate::expr::{Predicate, Reference};
 use crate::io::FileIO;
 use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile};
-use crate::spec::{DataContentType, SchemaRef};
+use crate::spec::{
+    DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, 
PartnerAccessor,
+    PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, 
Type,
+    visit_schema_with_partner,
+};
 use crate::{Error, ErrorKind, Result};
 
 #[derive(Clone, Debug)]
@@ -43,6 +51,7 @@ enum DeleteFileContext {
     PosDels(ArrowRecordBatchStream),
     FreshEqDel {
         batch_stream: ArrowRecordBatchStream,
+        equality_ids: HashSet<i32>,
         sender: tokio::sync::oneshot::Sender<Predicate>,
     },
 }
@@ -224,6 +233,7 @@ impl CachingDeleteFileLoader {
                     )
                     .await?,
                     sender,
+                    equality_ids: 
HashSet::from_iter(task.equality_ids.clone()),
                 })
             }
 
@@ -247,9 +257,11 @@ impl CachingDeleteFileLoader {
             DeleteFileContext::FreshEqDel {
                 sender,
                 batch_stream,
+                equality_ids,
             } => {
                 let predicate =
-                    
Self::parse_equality_deletes_record_batch_stream(batch_stream).await?;
+                    
Self::parse_equality_deletes_record_batch_stream(batch_stream, equality_ids)
+                        .await?;
 
                 sender
                     .send(predicate)
@@ -308,28 +320,341 @@ impl CachingDeleteFileLoader {
         Ok(result)
     }
 
-    /// Parses record batch streams from individual equality delete files
-    ///
-    /// Returns an unbound Predicate for each batch stream
     async fn parse_equality_deletes_record_batch_stream(
-        streams: ArrowRecordBatchStream,
+        mut stream: ArrowRecordBatchStream,
+        equality_ids: HashSet<i32>,
     ) -> Result<Predicate> {
-        // TODO
+        let mut result_predicate = AlwaysTrue;
+        let mut batch_schema_iceberg: Option<Schema> = None;
+        let accessor = EqDelRecordBatchPartnerAccessor;
+
+        while let Some(record_batch) = stream.next().await {
+            let record_batch = record_batch?;
+
+            if record_batch.num_columns() == 0 {
+                return Ok(AlwaysTrue);
+            }
+
+            let schema = match &batch_schema_iceberg {
+                Some(schema) => schema,
+                None => {
+                    let schema = 
arrow_schema_to_schema(record_batch.schema().as_ref())?;
+                    batch_schema_iceberg = Some(schema);
+                    batch_schema_iceberg.as_ref().unwrap()
+                }
+            };
+
+            let root_array: ArrayRef = 
Arc::new(StructArray::from(record_batch));
+
+            let mut processor = EqDelColumnProcessor::new(&equality_ids);
+            visit_schema_with_partner(schema, &root_array, &mut processor, 
&accessor)?;
+
+            let mut datum_columns_with_names = processor.finish()?;
+            if datum_columns_with_names.is_empty() {
+                continue;
+            }
+
+            // Process the collected columns in lockstep
+            #[allow(clippy::len_zero)]
+            while datum_columns_with_names[0].0.len() > 0 {
+                let mut row_predicate = AlwaysTrue;
+                for &mut (ref mut column, ref field_name) in &mut 
datum_columns_with_names {
+                    if let Some(item) = column.next() {
+                        let cell_predicate = if let Some(datum) = item? {
+                            
Reference::new(field_name.clone()).equal_to(datum.clone())
+                        } else {
+                            Reference::new(field_name.clone()).is_null()
+                        };
+                        row_predicate = row_predicate.and(cell_predicate)
+                    }
+                }
+                result_predicate = result_predicate.and(row_predicate.not());
+            }
+        }
+        Ok(result_predicate.rewrite_not())
+    }
+}
+
+struct EqDelColumnProcessor<'a> {
+    equality_ids: &'a HashSet<i32>,
+    collected_columns: Vec<(ArrayRef, String, Type)>,
+}
+
+impl<'a> EqDelColumnProcessor<'a> {
+    fn new(equality_ids: &'a HashSet<i32>) -> Self {
+        Self {
+            equality_ids,
+            collected_columns: Vec::with_capacity(equality_ids.len()),
+        }
+    }
+
+    #[allow(clippy::type_complexity)]
+    fn finish(
+        self,
+    ) -> Result<
+        Vec<(
+            Box<dyn ExactSizeIterator<Item = Result<Option<Datum>>>>,
+            String,
+        )>,
+    > {
+        self.collected_columns
+            .into_iter()
+            .map(|(array, field_name, field_type)| {
+                let primitive_type = field_type
+                    .as_primitive_type()
+                    .ok_or_else(|| {
+                        Error::new(ErrorKind::Unexpected, "field is not a 
primitive type")
+                    })?
+                    .clone();
+
+                let lit_vec = arrow_primitive_to_literal(&array, &field_type)?;
+                let datum_iterator: Box<dyn ExactSizeIterator<Item = 
Result<Option<Datum>>>> =
+                    Box::new(lit_vec.into_iter().map(move |c| {
+                        c.map(|literal| {
+                            literal
+                                .as_primitive_literal()
+                                .map(|primitive_literal| {
+                                    Datum::new(primitive_type.clone(), 
primitive_literal)
+                                })
+                                .ok_or(Error::new(
+                                    ErrorKind::Unexpected,
+                                    "failed to convert to primitive literal",
+                                ))
+                        })
+                        .transpose()
+                    }));
+
+                Ok((datum_iterator, field_name))
+            })
+            .collect::<Result<Vec<_>>>()
+    }
+}
+
+impl SchemaWithPartnerVisitor<ArrayRef> for EqDelColumnProcessor<'_> {
+    type T = ();
+
+    fn schema(&mut self, _schema: &Schema, _partner: &ArrayRef, _value: ()) -> 
Result<()> {
+        Ok(())
+    }
+
+    fn field(&mut self, field: &NestedFieldRef, partner: &ArrayRef, _value: 
()) -> Result<()> {
+        if self.equality_ids.contains(&field.id) && 
field.field_type.as_primitive_type().is_some() {
+            self.collected_columns.push((
+                partner.clone(),
+                field.name.clone(),
+                field.field_type.as_ref().clone(),
+            ));
+        }
+        Ok(())
+    }
+
+    fn r#struct(
+        &mut self,
+        _struct: &StructType,
+        _partner: &ArrayRef,
+        _results: Vec<()>,
+    ) -> Result<()> {
+        Ok(())
+    }
+
+    fn list(&mut self, _list: &ListType, _partner: &ArrayRef, _value: ()) -> 
Result<()> {
+        Ok(())
+    }
+
+    fn map(
+        &mut self,
+        _map: &MapType,
+        _partner: &ArrayRef,
+        _key_value: (),
+        _value: (),
+    ) -> Result<()> {
+        Ok(())
+    }
 
+    fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) 
-> Result<()> {
+        Ok(())
+    }
+}
+
+struct EqDelRecordBatchPartnerAccessor;
+
+impl PartnerAccessor<ArrayRef> for EqDelRecordBatchPartnerAccessor {
+    fn struct_partner<'a>(&self, schema_partner: &'a ArrayRef) -> Result<&'a 
ArrayRef> {
+        Ok(schema_partner)
+    }
+
+    fn field_partner<'a>(
+        &self,
+        struct_partner: &'a ArrayRef,
+        field: &NestedField,
+    ) -> Result<&'a ArrayRef> {
+        let Some(struct_array) = 
struct_partner.as_any().downcast_ref::<StructArray>() else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Expected struct array for field extraction",
+            ));
+        };
+
+        // Find the field by name within the struct
+        for (i, field_def) in struct_array.fields().iter().enumerate() {
+            if field_def.name() == &field.name {
+                return Ok(struct_array.column(i));
+            }
+        }
+
+        Err(Error::new(
+            ErrorKind::Unexpected,
+            format!("Field {} not found in parent struct", field.name),
+        ))
+    }
+
+    fn list_element_partner<'a>(&self, _list_partner: &'a ArrayRef) -> 
Result<&'a ArrayRef> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "List columns are unsupported in equality deletes",
+        ))
+    }
+
+    fn map_key_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a 
ArrayRef> {
         Err(Error::new(
             ErrorKind::FeatureUnsupported,
-            "parsing of equality deletes is not yet supported",
+            "Map columns are unsupported in equality deletes",
+        ))
+    }
+
+    fn map_value_partner<'a>(&self, _map_partner: &'a ArrayRef) -> Result<&'a 
ArrayRef> {
+        Err(Error::new(
+            ErrorKind::FeatureUnsupported,
+            "Map columns are unsupported in equality deletes",
         ))
     }
 }
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
+    use std::fs::File;
+    use std::sync::Arc;
+
+    use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, 
StringArray, StructArray};
+    use arrow_schema::{DataType, Field, Fields};
+    use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
+    use parquet::basic::Compression;
+    use parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
 
     use super::*;
     use crate::arrow::delete_filter::tests::setup;
 
+    #[tokio::test]
+    async fn test_delete_file_loader_parse_equality_deletes() {
+        let tmp_dir = TempDir::new().unwrap();
+        let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
+        let file_io = 
FileIO::from_path(table_location).unwrap().build().unwrap();
+
+        let eq_delete_file_path = 
setup_write_equality_delete_file_1(table_location);
+
+        let basic_delete_file_loader = 
BasicDeleteFileLoader::new(file_io.clone());
+        let record_batch_stream = basic_delete_file_loader
+            .parquet_to_batch_stream(&eq_delete_file_path)
+            .await
+            .expect("could not get batch stream");
+
+        let eq_ids = HashSet::from_iter(vec![2, 3, 4, 6]);
+
+        let parsed_eq_delete = 
CachingDeleteFileLoader::parse_equality_deletes_record_batch_stream(
+            record_batch_stream,
+            eq_ids,
+        )
+        .await
+        .expect("error parsing batch stream");
+        println!("{}", parsed_eq_delete);
+
+        let expected = "((((y != 1) OR (z != 100)) OR (a != \"HELP\")) OR (sa 
!= 4)) AND ((((y != 2) OR (z IS NOT NULL)) OR (a IS NOT NULL)) OR (sa != 
5))".to_string();
+
+        assert_eq!(parsed_eq_delete.to_string(), expected);
+    }
+
+    /// Create a simple field with metadata.
+    fn simple_field(name: &str, ty: DataType, nullable: bool, value: &str) -> 
Field {
+        arrow_schema::Field::new(name, ty, 
nullable).with_metadata(HashMap::from([(
+            PARQUET_FIELD_ID_META_KEY.to_string(),
+            value.to_string(),
+        )]))
+    }
+
+    fn setup_write_equality_delete_file_1(table_location: &str) -> String {
+        let col_y_vals = vec![1, 2];
+        let col_y = Arc::new(Int64Array::from(col_y_vals)) as ArrayRef;
+
+        let col_z_vals = vec![Some(100), None];
+        let col_z = Arc::new(Int64Array::from(col_z_vals)) as ArrayRef;
+
+        let col_a_vals = vec![Some("HELP"), None];
+        let col_a = Arc::new(StringArray::from(col_a_vals)) as ArrayRef;
+
+        let col_s = Arc::new(StructArray::from(vec![
+            (
+                Arc::new(simple_field("sa", DataType::Int32, false, "6")),
+                Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
+            ),
+            (
+                Arc::new(simple_field("sb", DataType::Utf8, true, "7")),
+                Arc::new(StringArray::from(vec![Some("x"), None])) as ArrayRef,
+            ),
+        ]));
+
+        let equality_delete_schema = {
+            let struct_field = DataType::Struct(Fields::from(vec![
+                simple_field("sa", DataType::Int32, false, "6"),
+                simple_field("sb", DataType::Utf8, true, "7"),
+            ]));
+
+            let fields = vec![
+                Field::new("y", arrow_schema::DataType::Int64, 
true).with_metadata(HashMap::from(
+                    [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())],
+                )),
+                Field::new("z", arrow_schema::DataType::Int64, 
true).with_metadata(HashMap::from(
+                    [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())],
+                )),
+                Field::new("a", arrow_schema::DataType::Utf8, 
true).with_metadata(HashMap::from([
+                    (PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()),
+                ])),
+                simple_field("s", struct_field, false, "5"),
+            ];
+            Arc::new(arrow_schema::Schema::new(fields))
+        };
+
+        let equality_deletes_to_write = 
RecordBatch::try_new(equality_delete_schema.clone(), vec![
+            col_y, col_z, col_a, col_s,
+        ])
+        .unwrap();
+
+        let path = format!("{}/equality-deletes-1.parquet", &table_location);
+
+        let file = File::create(&path).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_compression(Compression::SNAPPY)
+            .build();
+
+        let mut writer = ArrowWriter::try_new(
+            file,
+            equality_deletes_to_write.schema(),
+            Some(props.clone()),
+        )
+        .unwrap();
+
+        writer
+            .write(&equality_deletes_to_write)
+            .expect("Writing batch");
+
+        // writer must be closed to write footer
+        writer.close().unwrap();
+
+        path
+    }
+
     #[tokio::test]
     async fn test_caching_delete_file_loader_load_deletes() {
         let tmp_dir = TempDir::new().unwrap();
diff --git a/crates/iceberg/src/arrow/value.rs 
b/crates/iceberg/src/arrow/value.rs
index 9ddd941f..cc3a561d 100644
--- a/crates/iceberg/src/arrow/value.rs
+++ b/crates/iceberg/src/arrow/value.rs
@@ -27,7 +27,8 @@ use uuid::Uuid;
 use super::get_field_id;
 use crate::spec::{
     ListType, Literal, Map, MapType, NestedField, PartnerAccessor, 
PrimitiveType,
-    SchemaWithPartnerVisitor, Struct, StructType, visit_struct_with_partner,
+    SchemaWithPartnerVisitor, Struct, StructType, Type, 
visit_struct_with_partner,
+    visit_type_with_partner,
 };
 use crate::{Error, ErrorKind, Result};
 
@@ -604,6 +605,20 @@ pub fn arrow_struct_to_literal(
     )
 }
 
+/// Convert arrow primitive array to iceberg primitive value array.
+/// This function will assume the schema of arrow struct array is the same as 
iceberg struct type.
+pub fn arrow_primitive_to_literal(
+    primitive_array: &ArrayRef,
+    ty: &Type,
+) -> Result<Vec<Option<Literal>>> {
+    visit_type_with_partner(
+        ty,
+        primitive_array,
+        &mut ArrowArrayToIcebergStructConverter,
+        &ArrowArrayAccessor::new(),
+    )
+}
+
 #[cfg(test)]
 mod test {
     use std::collections::HashMap;
diff --git a/crates/iceberg/src/scan/context.rs 
b/crates/iceberg/src/scan/context.rs
index 1e4ef41b..3f7c29db 100644
--- a/crates/iceberg/src/scan/context.rs
+++ b/crates/iceberg/src/scan/context.rs
@@ -191,7 +191,6 @@ impl PlanContext {
 
         // TODO: Ideally we could ditch this intermediate Vec as we return an 
iterator.
         let mut filtered_mfcs = vec![];
-
         for manifest_file in manifest_files {
             let tx = if manifest_file.content == ManifestContentType::Deletes {
                 delete_file_tx.clone()

Reply via email to