This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new adc5d20  Add missing arrow predicate pushdown implementations for 
`StartsWith`, `NotStartsWith`, `In`, and `NotIn` (#404)
adc5d20 is described below

commit adc5d200582f820ef408975bf541218aa9b07d64
Author: Scott Donnelly <[email protected]>
AuthorDate: Sat Jun 15 10:11:33 2024 +0100

    Add missing arrow predicate pushdown implementations for `StartsWith`, 
`NotStartsWith`, `In`, and `NotIn` (#404)
    
    * feat: add [not_]starts_with and [not_]in arrow predicate pushdown
    
    * fixes from issues highlighted in review
---
 Cargo.toml                                         |   1 +
 crates/iceberg/Cargo.toml                          |   1 +
 crates/iceberg/src/arrow/reader.rs                 |  92 +++++++++++++----
 crates/iceberg/src/arrow/schema.rs                 |   2 +
 crates/iceberg/src/scan.rs                         | 110 ++++++++++++++++++++-
 .../testdata/example_table_metadata_v2.json        |   3 +-
 6 files changed, 187 insertions(+), 22 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5a6e2a9..8c1871e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -43,6 +43,7 @@ arrow-array = { version = "52" }
 arrow-ord = { version = "52" }
 arrow-schema = { version = "52" }
 arrow-select = { version = "52" }
+arrow-string = { version = "52" }
 async-stream = "0.3.5"
 async-trait = "0.1"
 aws-config = "1.1.8"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 7ebccad..5404ac9 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -44,6 +44,7 @@ arrow-array = { workspace = true }
 arrow-ord = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
+arrow-string = { workspace = true }
 async-stream = { workspace = true }
 async-trait = { workspace = true }
 bimap = { workspace = true }
diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index c10264b..7d04a59 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -22,6 +22,7 @@ use arrow_arith::boolean::{and, is_not_null, is_null, not, 
or};
 use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
 use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
 use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_string::like::starts_with;
 use async_stream::try_stream;
 use bytes::Bytes;
 use fnv::FnvHashSet;
@@ -741,42 +742,98 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> 
{
 
     fn starts_with(
         &mut self,
-        _reference: &BoundReference,
-        _literal: &Datum,
+        reference: &BoundReference,
+        literal: &Datum,
         _predicate: &BoundPredicate,
     ) -> Result<Box<PredicateResult>> {
-        // TODO: Implement starts_with
-        self.build_always_true()
+        if let Some(idx) = self.bound_reference(reference)? {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(move |batch| {
+                let left = project_column(&batch, idx)?;
+                starts_with(&left, literal.as_ref())
+            }))
+        } else {
+            // A missing column, treating it as null.
+            self.build_always_false()
+        }
     }
 
     fn not_starts_with(
         &mut self,
-        _reference: &BoundReference,
-        _literal: &Datum,
+        reference: &BoundReference,
+        literal: &Datum,
         _predicate: &BoundPredicate,
     ) -> Result<Box<PredicateResult>> {
-        // TODO: Implement not_starts_with
-        self.build_always_true()
+        if let Some(idx) = self.bound_reference(reference)? {
+            let literal = get_arrow_datum(literal)?;
+
+            Ok(Box::new(move |batch| {
+                let left = project_column(&batch, idx)?;
+
+                // update here if arrow ever adds a native not_starts_with
+                not(&starts_with(&left, literal.as_ref())?)
+            }))
+        } else {
+            // A missing column, treating it as null.
+            self.build_always_true()
+        }
     }
 
     fn r#in(
         &mut self,
-        _reference: &BoundReference,
-        _literals: &FnvHashSet<Datum>,
+        reference: &BoundReference,
+        literals: &FnvHashSet<Datum>,
         _predicate: &BoundPredicate,
     ) -> Result<Box<PredicateResult>> {
-        // TODO: Implement in
-        self.build_always_true()
+        if let Some(idx) = self.bound_reference(reference)? {
+            let literals: Vec<_> = literals
+                .iter()
+                .map(|lit| get_arrow_datum(lit).unwrap())
+                .collect();
+
+            Ok(Box::new(move |batch| {
+                // update this if arrow ever adds a native is_in kernel
+                let left = project_column(&batch, idx)?;
+                let mut acc = BooleanArray::from(vec![false; 
batch.num_rows()]);
+                for literal in &literals {
+                    acc = or(&acc, &eq(&left, literal.as_ref())?)?
+                }
+
+                Ok(acc)
+            }))
+        } else {
+            // A missing column, treating it as null.
+            self.build_always_false()
+        }
     }
 
     fn not_in(
         &mut self,
-        _reference: &BoundReference,
-        _literals: &FnvHashSet<Datum>,
+        reference: &BoundReference,
+        literals: &FnvHashSet<Datum>,
         _predicate: &BoundPredicate,
     ) -> Result<Box<PredicateResult>> {
-        // TODO: Implement not_in
-        self.build_always_true()
+        if let Some(idx) = self.bound_reference(reference)? {
+            let literals: Vec<_> = literals
+                .iter()
+                .map(|lit| get_arrow_datum(lit).unwrap())
+                .collect();
+
+            Ok(Box::new(move |batch| {
+                // update this if arrow ever adds a native not_in kernel
+                let left = project_column(&batch, idx)?;
+                let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
+                for literal in &literals {
+                    acc = and(&acc, &neq(&left, literal.as_ref())?)?
+                }
+
+                Ok(acc)
+            }))
+        } else {
+            // A missing column, treating it as null.
+            self.build_always_true()
+        }
     }
 }
 
@@ -784,7 +841,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
 ///
 /// # TODO
 ///
-/// 
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
 contains the following hints to speed up metadata loading, we can consider 
adding them to this struct:
+/// 
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
+/// contains the following hints to speed up metadata loading, we can consider 
adding them to this struct:
 ///
 /// - `metadata_size_hint`: Provide a hint as to the size of the parquet 
file's footer.
 /// - `preload_column_index`: Load the Column Index  as part of 
[`Self::get_metadata`].
diff --git a/crates/iceberg/src/arrow/schema.rs 
b/crates/iceberg/src/arrow/schema.rs
index 172d4bb..260e144 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -26,6 +26,7 @@ use crate::{Error, ErrorKind};
 use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type};
 use arrow_array::{
     BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, 
Int64Array,
+    StringArray,
 };
 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
 use bitvec::macros::internal::funty::Fundamental;
@@ -605,6 +606,7 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> 
Result<Box<dyn ArrowDatum + Send
         PrimitiveLiteral::Long(value) => 
Ok(Box::new(Int64Array::new_scalar(*value))),
         PrimitiveLiteral::Float(value) => 
Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
         PrimitiveLiteral::Double(value) => 
Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
+        PrimitiveLiteral::String(value) => 
Ok(Box::new(StringArray::new_scalar(value.as_str()))),
         l => Err(Error::new(
             ErrorKind::FeatureUnsupported,
             format!(
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 5f0922e..d116170 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -523,7 +523,7 @@ mod tests {
     };
     use crate::table::Table;
     use crate::TableIdent;
-    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
     use futures::TryStreamExt;
     use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
     use parquet::basic::Compression;
@@ -705,10 +705,15 @@ mod tests {
                             PARQUET_FIELD_ID_META_KEY.to_string(),
                             "3".to_string(),
                         )])),
+                    arrow_schema::Field::new("a", 
arrow_schema::DataType::Utf8, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "4".to_string(),
+                        )])),
                 ];
                 Arc::new(arrow_schema::Schema::new(fields))
             };
-            // 3 columns:
+            // 4 columns:
             // x: [1, 1, 1, 1, ...]
             let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) 
as ArrayRef;
 
@@ -725,7 +730,14 @@ mod tests {
 
             // z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
             let col3 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
-            let to_write = RecordBatch::try_new(schema.clone(), vec![col1, 
col2, col3]).unwrap();
+
+            // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg", 
"Iceberg"]
+            let mut values = vec!["Apache"; 512];
+            values.append(vec!["Iceberg"; 512].as_mut());
+            let col4 = Arc::new(StringArray::from_iter_values(values)) as 
ArrayRef;
+
+            let to_write =
+                RecordBatch::try_new(schema.clone(), vec![col1, col2, col3, 
col4]).unwrap();
 
             // Write the Parquet files
             let props = WriterProperties::builder()
@@ -773,7 +785,7 @@ mod tests {
     fn test_select_no_exist_column() {
         let table = TableTestFixture::new().table;
 
-        let table_scan = table.scan().select(["x", "y", "z", "a"]).build();
+        let table_scan = table.scan().select(["x", "y", "z", "a", 
"b"]).build();
         assert!(table_scan.is_err());
     }
 
@@ -1040,4 +1052,94 @@ mod tests {
         let expected_z = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
         assert_eq!(col, &expected_z);
     }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_startswith() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: a STARTSWITH "Ice"
+        let mut builder = fixture.table.scan();
+        let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_rows(), 512);
+
+        let col = batches[0].column_by_name("a").unwrap();
+        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(string_arr.value(0), "Iceberg");
+    }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_not_startswith() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: a NOT STARTSWITH "Ice"
+        let mut builder = fixture.table.scan();
+        let predicate = 
Reference::new("a").not_starts_with(Datum::string("Ice"));
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_rows(), 512);
+
+        let col = batches[0].column_by_name("a").unwrap();
+        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(string_arr.value(0), "Apache");
+    }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_in() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: a IN ("Sioux", "Iceberg")
+        let mut builder = fixture.table.scan();
+        let predicate =
+            Reference::new("a").is_in([Datum::string("Sioux"), 
Datum::string("Iceberg")]);
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_rows(), 512);
+
+        let col = batches[0].column_by_name("a").unwrap();
+        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(string_arr.value(0), "Iceberg");
+    }
+
+    #[tokio::test]
+    async fn test_filter_on_arrow_not_in() {
+        let mut fixture = TableTestFixture::new();
+        fixture.setup_manifest_files().await;
+
+        // Filter: a NOT IN ("Sioux", "Iceberg")
+        let mut builder = fixture.table.scan();
+        let predicate =
+            Reference::new("a").is_not_in([Datum::string("Sioux"), 
Datum::string("Iceberg")]);
+        builder = builder.filter(predicate);
+        let table_scan = builder.build().unwrap();
+
+        let batch_stream = table_scan.to_arrow().await.unwrap();
+
+        let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+        assert_eq!(batches[0].num_rows(), 512);
+
+        let col = batches[0].column_by_name("a").unwrap();
+        let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+        assert_eq!(string_arr.value(0), "Apache");
+    }
 }
diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json 
b/crates/iceberg/testdata/example_table_metadata_v2.json
index 809c355..cf9fef9 100644
--- a/crates/iceberg/testdata/example_table_metadata_v2.json
+++ b/crates/iceberg/testdata/example_table_metadata_v2.json
@@ -15,7 +15,8 @@
       "fields": [
         {"id": 1, "name": "x", "required": true, "type": "long"},
         {"id": 2, "name": "y", "required": true, "type": "long", "doc": 
"comment"},
-        {"id": 3, "name": "z", "required": true, "type": "long"}
+        {"id": 3, "name": "z", "required": true, "type": "long"},
+        {"id": 4, "name": "a", "required": true, "type": "string"}
       ]
     }
   ],

Reply via email to