This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new adc5d20 Add missing arrow predicate pushdown implementations for
`StartsWith`, `NotStartsWith`, `In`, and `NotIn` (#404)
adc5d20 is described below
commit adc5d200582f820ef408975bf541218aa9b07d64
Author: Scott Donnelly <[email protected]>
AuthorDate: Sat Jun 15 10:11:33 2024 +0100
Add missing arrow predicate pushdown implementations for `StartsWith`,
`NotStartsWith`, `In`, and `NotIn` (#404)
* feat: add [not_]starts_with and [not_]in arrow predicate pushdown
* fixes from issues highlighted in review
---
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 1 +
crates/iceberg/src/arrow/reader.rs | 92 +++++++++++++----
crates/iceberg/src/arrow/schema.rs | 2 +
crates/iceberg/src/scan.rs | 110 ++++++++++++++++++++-
.../testdata/example_table_metadata_v2.json | 3 +-
6 files changed, 187 insertions(+), 22 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 5a6e2a9..8c1871e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -43,6 +43,7 @@ arrow-array = { version = "52" }
arrow-ord = { version = "52" }
arrow-schema = { version = "52" }
arrow-select = { version = "52" }
+arrow-string = { version = "52" }
async-stream = "0.3.5"
async-trait = "0.1"
aws-config = "1.1.8"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 7ebccad..5404ac9 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -44,6 +44,7 @@ arrow-array = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
+arrow-string = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index c10264b..7d04a59 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -22,6 +22,7 @@ use arrow_arith::boolean::{and, is_not_null, is_null, not,
or};
use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
+use arrow_string::like::starts_with;
use async_stream::try_stream;
use bytes::Bytes;
use fnv::FnvHashSet;
@@ -741,42 +742,98 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a>
{
fn starts_with(
&mut self,
- _reference: &BoundReference,
- _literal: &Datum,
+ reference: &BoundReference,
+ literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
- // TODO: Implement starts_with
- self.build_always_true()
+ if let Some(idx) = self.bound_reference(reference)? {
+ let literal = get_arrow_datum(literal)?;
+
+ Ok(Box::new(move |batch| {
+ let left = project_column(&batch, idx)?;
+ starts_with(&left, literal.as_ref())
+ }))
+ } else {
+ // A missing column, treating it as null.
+ self.build_always_false()
+ }
}
fn not_starts_with(
&mut self,
- _reference: &BoundReference,
- _literal: &Datum,
+ reference: &BoundReference,
+ literal: &Datum,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
- // TODO: Implement not_starts_with
- self.build_always_true()
+ if let Some(idx) = self.bound_reference(reference)? {
+ let literal = get_arrow_datum(literal)?;
+
+ Ok(Box::new(move |batch| {
+ let left = project_column(&batch, idx)?;
+
+ // update here if arrow ever adds a native not_starts_with
+ not(&starts_with(&left, literal.as_ref())?)
+ }))
+ } else {
+ // A missing column, treating it as null.
+ self.build_always_true()
+ }
}
fn r#in(
&mut self,
- _reference: &BoundReference,
- _literals: &FnvHashSet<Datum>,
+ reference: &BoundReference,
+ literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
- // TODO: Implement in
- self.build_always_true()
+ if let Some(idx) = self.bound_reference(reference)? {
+ let literals: Vec<_> = literals
+ .iter()
+ .map(|lit| get_arrow_datum(lit).unwrap())
+ .collect();
+
+ Ok(Box::new(move |batch| {
+ // update this if arrow ever adds a native is_in kernel
+ let left = project_column(&batch, idx)?;
+ let mut acc = BooleanArray::from(vec![false;
batch.num_rows()]);
+ for literal in &literals {
+ acc = or(&acc, &eq(&left, literal.as_ref())?)?
+ }
+
+ Ok(acc)
+ }))
+ } else {
+ // A missing column, treating it as null.
+ self.build_always_false()
+ }
}
fn not_in(
&mut self,
- _reference: &BoundReference,
- _literals: &FnvHashSet<Datum>,
+ reference: &BoundReference,
+ literals: &FnvHashSet<Datum>,
_predicate: &BoundPredicate,
) -> Result<Box<PredicateResult>> {
- // TODO: Implement not_in
- self.build_always_true()
+ if let Some(idx) = self.bound_reference(reference)? {
+ let literals: Vec<_> = literals
+ .iter()
+ .map(|lit| get_arrow_datum(lit).unwrap())
+ .collect();
+
+ Ok(Box::new(move |batch| {
+ // update this if arrow ever adds a native not_in kernel
+ let left = project_column(&batch, idx)?;
+ let mut acc = BooleanArray::from(vec![true; batch.num_rows()]);
+ for literal in &literals {
+ acc = and(&acc, &neq(&left, literal.as_ref())?)?
+ }
+
+ Ok(acc)
+ }))
+ } else {
+ // A missing column, treating it as null.
+ self.build_always_true()
+ }
}
}
@@ -784,7 +841,8 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
///
/// # TODO
///
-///
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
contains the following hints to speed up metadata loading, we can consider
adding them to this struct:
+///
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
+/// contains the following hints to speed up metadata loading, we can consider
adding them to this struct:
///
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet
file's footer.
/// - `preload_column_index`: Load the Column Index as part of
[`Self::get_metadata`].
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index 172d4bb..260e144 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -26,6 +26,7 @@ use crate::{Error, ErrorKind};
use arrow_array::types::{validate_decimal_precision_and_scale, Decimal128Type};
use arrow_array::{
BooleanArray, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array,
Int64Array,
+ StringArray,
};
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use bitvec::macros::internal::funty::Fundamental;
@@ -605,6 +606,7 @@ pub(crate) fn get_arrow_datum(datum: &Datum) ->
Result<Box<dyn ArrowDatum + Send
PrimitiveLiteral::Long(value) =>
Ok(Box::new(Int64Array::new_scalar(*value))),
PrimitiveLiteral::Float(value) =>
Ok(Box::new(Float32Array::new_scalar(value.as_f32()))),
PrimitiveLiteral::Double(value) =>
Ok(Box::new(Float64Array::new_scalar(value.as_f64()))),
+ PrimitiveLiteral::String(value) =>
Ok(Box::new(StringArray::new_scalar(value.as_str()))),
l => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 5f0922e..d116170 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -523,7 +523,7 @@ mod tests {
};
use crate::table::Table;
use crate::TableIdent;
- use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
@@ -705,10 +705,15 @@ mod tests {
PARQUET_FIELD_ID_META_KEY.to_string(),
"3".to_string(),
)])),
+ arrow_schema::Field::new("a",
arrow_schema::DataType::Utf8, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "4".to_string(),
+ )])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
- // 3 columns:
+ // 4 columns:
// x: [1, 1, 1, 1, ...]
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024]))
as ArrayRef;
@@ -725,7 +730,14 @@ mod tests {
// z: [3, 3, 3, 3, ..., 4, 4, 4, 4]
let col3 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
- let to_write = RecordBatch::try_new(schema.clone(), vec![col1,
col2, col3]).unwrap();
+
+ // a: ["Apache", "Apache", "Apache", ..., "Iceberg", "Iceberg",
"Iceberg"]
+ let mut values = vec!["Apache"; 512];
+ values.append(vec!["Iceberg"; 512].as_mut());
+ let col4 = Arc::new(StringArray::from_iter_values(values)) as
ArrayRef;
+
+ let to_write =
+ RecordBatch::try_new(schema.clone(), vec![col1, col2, col3,
col4]).unwrap();
// Write the Parquet files
let props = WriterProperties::builder()
@@ -773,7 +785,7 @@ mod tests {
fn test_select_no_exist_column() {
let table = TableTestFixture::new().table;
- let table_scan = table.scan().select(["x", "y", "z", "a"]).build();
+ let table_scan = table.scan().select(["x", "y", "z", "a",
"b"]).build();
assert!(table_scan.is_err());
}
@@ -1040,4 +1052,94 @@ mod tests {
let expected_z = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
assert_eq!(col, &expected_z);
}
+
+ #[tokio::test]
+ async fn test_filter_on_arrow_startswith() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Filter: a STARTSWITH "Ice"
+ let mut builder = fixture.table.scan();
+ let predicate = Reference::new("a").starts_with(Datum::string("Ice"));
+ builder = builder.filter(predicate);
+ let table_scan = builder.build().unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_rows(), 512);
+
+ let col = batches[0].column_by_name("a").unwrap();
+ let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(string_arr.value(0), "Iceberg");
+ }
+
+ #[tokio::test]
+ async fn test_filter_on_arrow_not_startswith() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Filter: a NOT STARTSWITH "Ice"
+ let mut builder = fixture.table.scan();
+ let predicate =
Reference::new("a").not_starts_with(Datum::string("Ice"));
+ builder = builder.filter(predicate);
+ let table_scan = builder.build().unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_rows(), 512);
+
+ let col = batches[0].column_by_name("a").unwrap();
+ let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(string_arr.value(0), "Apache");
+ }
+
+ #[tokio::test]
+ async fn test_filter_on_arrow_in() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Filter: a IN ("Sioux", "Iceberg")
+ let mut builder = fixture.table.scan();
+ let predicate =
+ Reference::new("a").is_in([Datum::string("Sioux"),
Datum::string("Iceberg")]);
+ builder = builder.filter(predicate);
+ let table_scan = builder.build().unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_rows(), 512);
+
+ let col = batches[0].column_by_name("a").unwrap();
+ let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(string_arr.value(0), "Iceberg");
+ }
+
+ #[tokio::test]
+ async fn test_filter_on_arrow_not_in() {
+ let mut fixture = TableTestFixture::new();
+ fixture.setup_manifest_files().await;
+
+ // Filter: a NOT IN ("Sioux", "Iceberg")
+ let mut builder = fixture.table.scan();
+ let predicate =
+ Reference::new("a").is_not_in([Datum::string("Sioux"),
Datum::string("Iceberg")]);
+ builder = builder.filter(predicate);
+ let table_scan = builder.build().unwrap();
+
+ let batch_stream = table_scan.to_arrow().await.unwrap();
+
+ let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
+
+ assert_eq!(batches[0].num_rows(), 512);
+
+ let col = batches[0].column_by_name("a").unwrap();
+ let string_arr = col.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(string_arr.value(0), "Apache");
+ }
}
diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json
b/crates/iceberg/testdata/example_table_metadata_v2.json
index 809c355..cf9fef9 100644
--- a/crates/iceberg/testdata/example_table_metadata_v2.json
+++ b/crates/iceberg/testdata/example_table_metadata_v2.json
@@ -15,7 +15,8 @@
"fields": [
{"id": 1, "name": "x", "required": true, "type": "long"},
{"id": 2, "name": "y", "required": true, "type": "long", "doc":
"comment"},
- {"id": 3, "name": "z", "required": true, "type": "long"}
+ {"id": 3, "name": "z", "required": true, "type": "long"},
+ {"id": 4, "name": "a", "required": true, "type": "string"}
]
}
],