This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 026b436f feat(datafusion): Support pushdown more datafusion exprs to
Iceberg (#649)
026b436f is described below
commit 026b436ff457b05da1fa05ea60e69260f62bb90c
Author: FANNG <[email protected]>
AuthorDate: Sat Oct 12 13:56:35 2024 +0800
feat(datafusion): Support pushdown more datafusion exprs to Iceberg (#649)
* suppport more datafusion predict
* add it
* reverse op
---
crates/iceberg/src/expr/predicate.rs | 27 +-
.../src/physical_plan/expr_to_predicate.rs | 454 ++++++++++++---------
.../datafusion/src/physical_plan/scan.rs | 26 +-
crates/integrations/datafusion/src/table.rs | 13 +-
.../tests/integration_datafusion_test.rs | 54 ++-
5 files changed, 340 insertions(+), 234 deletions(-)
diff --git a/crates/iceberg/src/expr/predicate.rs
b/crates/iceberg/src/expr/predicate.rs
index acf21a5b..76befb6d 100644
--- a/crates/iceberg/src/expr/predicate.rs
+++ b/crates/iceberg/src/expr/predicate.rs
@@ -132,7 +132,16 @@ impl<T: Bind> Bind for UnaryExpression<T> {
}
impl<T> UnaryExpression<T> {
- pub(crate) fn new(op: PredicateOperator, term: T) -> Self {
+ /// Creates a unary expression with the given operator and term.
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use iceberg::expr::{PredicateOperator, Reference, UnaryExpression};
+ ///
+ /// UnaryExpression::new(PredicateOperator::IsNull, Reference::new("c"));
+ /// ```
+ pub fn new(op: PredicateOperator, term: T) -> Self {
debug_assert!(op.is_unary());
Self { op, term }
}
@@ -171,7 +180,21 @@ impl<T: Debug> Debug for BinaryExpression<T> {
}
impl<T> BinaryExpression<T> {
- pub(crate) fn new(op: PredicateOperator, term: T, literal: Datum) -> Self {
+ /// Creates a binary expression with the given operator, term and literal.
+ ///
+ /// # Example
+ ///
+ /// ```rust
+ /// use iceberg::expr::{BinaryExpression, PredicateOperator, Reference};
+ /// use iceberg::spec::Datum;
+ ///
+ /// BinaryExpression::new(
+ /// PredicateOperator::LessThanOrEq,
+ /// Reference::new("a"),
+ /// Datum::int(10),
+ /// );
+ /// ```
+ pub fn new(op: PredicateOperator, term: T, literal: Datum) -> Self {
debug_assert!(op.is_binary());
Self { op, term, literal }
}
diff --git
a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
index 110e4f7e..f438308e 100644
--- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
+++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
@@ -15,103 +15,179 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::VecDeque;
+use std::vec;
-use datafusion::common::tree_node::{TreeNodeRecursion, TreeNodeVisitor};
-use datafusion::common::Column;
-use datafusion::error::DataFusionError;
use datafusion::logical_expr::{Expr, Operator};
use datafusion::scalar::ScalarValue;
-use iceberg::expr::{Predicate, Reference};
+use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference,
UnaryExpression};
use iceberg::spec::Datum;
-pub struct ExprToPredicateVisitor {
- stack: VecDeque<Option<Predicate>>,
+// A datafusion expression could be an Iceberg predicate, column, or literal.
+enum TransformedResult {
+ Predicate(Predicate),
+ Column(Reference),
+ Literal(Datum),
+ NotTransformed,
}
-impl ExprToPredicateVisitor {
- /// Create a new predicate conversion visitor.
- pub fn new() -> Self {
- Self {
- stack: VecDeque::new(),
+
+enum OpTransformedResult {
+ Operator(PredicateOperator),
+ And,
+ Or,
+ NotTransformed,
+}
+
+/// Converts DataFusion filters ([`Expr`]) to an iceberg [`Predicate`].
+/// If none of the filters could be converted, return `None` which adds no
predicates to the scan operation.
+/// If the conversion was successful, return the converted predicates combined
with an AND operator.
+pub fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
+ filters
+ .iter()
+ .filter_map(convert_filter_to_predicate)
+ .reduce(Predicate::and)
+}
+
+fn convert_filter_to_predicate(expr: &Expr) -> Option<Predicate> {
+ match to_iceberg_predicate(expr) {
+ TransformedResult::Predicate(predicate) => Some(predicate),
+ TransformedResult::Column(_) | TransformedResult::Literal(_) => {
+ unreachable!("Not a valid expression: {:?}", expr)
}
+ _ => None,
}
- /// Get the predicate from the stack.
- pub fn get_predicate(&self) -> Option<Predicate> {
- self.stack
- .iter()
- .filter_map(|opt| opt.clone())
- .reduce(Predicate::and)
+}
+
+fn to_iceberg_predicate(expr: &Expr) -> TransformedResult {
+ match expr {
+ Expr::BinaryExpr(binary) => {
+ let left = to_iceberg_predicate(&binary.left);
+ let right = to_iceberg_predicate(&binary.right);
+ let op = to_iceberg_operation(binary.op);
+ match op {
+ OpTransformedResult::Operator(op) =>
to_iceberg_binary_predicate(left, right, op),
+ OpTransformedResult::And => to_iceberg_and_predicate(left,
right),
+ OpTransformedResult::Or => to_iceberg_or_predicate(left,
right),
+ OpTransformedResult::NotTransformed =>
TransformedResult::NotTransformed,
+ }
+ }
+ Expr::Not(exp) => {
+ let expr = to_iceberg_predicate(exp);
+ match expr {
+ TransformedResult::Predicate(p) =>
TransformedResult::Predicate(!p),
+ _ => TransformedResult::NotTransformed,
+ }
+ }
+ Expr::Column(column) =>
TransformedResult::Column(Reference::new(column.name())),
+ Expr::Literal(literal) => match scalar_value_to_datum(literal) {
+ Some(data) => TransformedResult::Literal(data),
+ None => TransformedResult::NotTransformed,
+ },
+ Expr::InList(inlist) => {
+ let mut datums = vec![];
+ for expr in &inlist.list {
+ let p = to_iceberg_predicate(expr);
+ match p {
+ TransformedResult::Literal(l) => datums.push(l),
+ _ => return TransformedResult::NotTransformed,
+ }
+ }
+
+ let expr = to_iceberg_predicate(&inlist.expr);
+ match expr {
+ TransformedResult::Column(r) => match inlist.negated {
+ false => TransformedResult::Predicate(r.is_in(datums)),
+ true => TransformedResult::Predicate(r.is_not_in(datums)),
+ },
+ _ => TransformedResult::NotTransformed,
+ }
+ }
+ Expr::IsNull(expr) => {
+ let p = to_iceberg_predicate(expr);
+ match p {
+ TransformedResult::Column(r) =>
TransformedResult::Predicate(Predicate::Unary(
+ UnaryExpression::new(PredicateOperator::IsNull, r),
+ )),
+ _ => TransformedResult::NotTransformed,
+ }
+ }
+ Expr::IsNotNull(expr) => {
+ let p = to_iceberg_predicate(expr);
+ match p {
+ TransformedResult::Column(r) =>
TransformedResult::Predicate(Predicate::Unary(
+ UnaryExpression::new(PredicateOperator::NotNull, r),
+ )),
+ _ => TransformedResult::NotTransformed,
+ }
+ }
+ _ => TransformedResult::NotTransformed,
}
+}
- /// Convert a column expression to an iceberg predicate.
- fn convert_column_expr(
- &self,
- col: &Column,
- op: &Operator,
- lit: &ScalarValue,
- ) -> Option<Predicate> {
- let reference = Reference::new(col.name.clone());
- let datum = scalar_value_to_datum(lit)?;
- Some(binary_op_to_predicate(reference, op, datum))
+fn to_iceberg_operation(op: Operator) -> OpTransformedResult {
+ match op {
+ Operator::Eq => OpTransformedResult::Operator(PredicateOperator::Eq),
+ Operator::NotEq =>
OpTransformedResult::Operator(PredicateOperator::NotEq),
+ Operator::Lt =>
OpTransformedResult::Operator(PredicateOperator::LessThan),
+ Operator::LtEq =>
OpTransformedResult::Operator(PredicateOperator::LessThanOrEq),
+ Operator::Gt =>
OpTransformedResult::Operator(PredicateOperator::GreaterThan),
+ Operator::GtEq =>
OpTransformedResult::Operator(PredicateOperator::GreaterThanOrEq),
+ // AND OR
+ Operator::And => OpTransformedResult::And,
+ Operator::Or => OpTransformedResult::Or,
+ // Others not supported
+ _ => OpTransformedResult::NotTransformed,
}
+}
- /// Convert a compound expression to an iceberg predicate.
- ///
- /// The strategy is to support the following cases:
- /// - if its an AND expression then the result will be the valid
predicates, whether there are 2 or just 1
- /// - if its an OR expression then a predicate will be returned only if
there are 2 valid predicates on both sides
- fn convert_compound_expr(&self, valid_preds: &[Predicate], op: &Operator)
-> Option<Predicate> {
- let valid_preds_count = valid_preds.len();
- match (op, valid_preds_count) {
- (Operator::And, 1) => valid_preds.first().cloned(),
- (Operator::And, 2) => Some(Predicate::and(
- valid_preds[0].clone(),
- valid_preds[1].clone(),
- )),
- (Operator::Or, 2) => Some(Predicate::or(
- valid_preds[0].clone(),
- valid_preds[1].clone(),
- )),
- _ => None,
+fn to_iceberg_and_predicate(
+ left: TransformedResult,
+ right: TransformedResult,
+) -> TransformedResult {
+ match (left, right) {
+ (TransformedResult::Predicate(left),
TransformedResult::Predicate(right)) => {
+ TransformedResult::Predicate(left.and(right))
}
+ (TransformedResult::Predicate(left), _) =>
TransformedResult::Predicate(left),
+ (_, TransformedResult::Predicate(right)) =>
TransformedResult::Predicate(right),
+ _ => TransformedResult::NotTransformed,
}
}
-// Implement TreeNodeVisitor for ExprToPredicateVisitor
-impl<'n> TreeNodeVisitor<'n> for ExprToPredicateVisitor {
- type Node = Expr;
-
- fn f_down(&mut self, _node: &'n Expr) -> Result<TreeNodeRecursion,
DataFusionError> {
- Ok(TreeNodeRecursion::Continue)
+fn to_iceberg_or_predicate(left: TransformedResult, right: TransformedResult)
-> TransformedResult {
+ match (left, right) {
+ (TransformedResult::Predicate(left),
TransformedResult::Predicate(right)) => {
+ TransformedResult::Predicate(left.or(right))
+ }
+ _ => TransformedResult::NotTransformed,
}
+}
- fn f_up(&mut self, expr: &'n Expr) -> Result<TreeNodeRecursion,
DataFusionError> {
- if let Expr::BinaryExpr(binary) = expr {
- match (&*binary.left, &binary.op, &*binary.right) {
- // process simple binary expressions, e.g. col > 1
- (Expr::Column(col), op, Expr::Literal(lit)) => {
- let col_pred = self.convert_column_expr(col, op, lit);
- self.stack.push_back(col_pred);
- }
- // // process reversed binary expressions, e.g. 1 < col
- (Expr::Literal(lit), op, Expr::Column(col)) => {
- let col_pred = op
- .swap()
- .and_then(|negated_op| self.convert_column_expr(col,
&negated_op, lit));
- self.stack.push_back(col_pred);
- }
- // process compound expressions (involving logical operators.
e.g., AND or OR and children)
- (_left, op, _right) if op.is_logic_operator() => {
- let right_pred = self.stack.pop_back().flatten();
- let left_pred = self.stack.pop_back().flatten();
- let children: Vec<_> = [left_pred,
right_pred].into_iter().flatten().collect();
- let compound_pred = self.convert_compound_expr(&children,
op);
- self.stack.push_back(compound_pred);
- }
- _ => return Ok(TreeNodeRecursion::Continue),
- }
+fn to_iceberg_binary_predicate(
+ left: TransformedResult,
+ right: TransformedResult,
+ op: PredicateOperator,
+) -> TransformedResult {
+ let (r, d, op) = match (left, right) {
+ (TransformedResult::NotTransformed, _) => return
TransformedResult::NotTransformed,
+ (_, TransformedResult::NotTransformed) => return
TransformedResult::NotTransformed,
+ (TransformedResult::Column(r), TransformedResult::Literal(d)) => (r,
d, op),
+ (TransformedResult::Literal(d), TransformedResult::Column(r)) => {
+ (r, d, reverse_predicate_operator(op))
}
- Ok(TreeNodeRecursion::Continue)
+ _ => return TransformedResult::NotTransformed,
+ };
+ TransformedResult::Predicate(Predicate::Binary(BinaryExpression::new(op,
r, d)))
+}
+
+fn reverse_predicate_operator(op: PredicateOperator) -> PredicateOperator {
+ match op {
+ PredicateOperator::Eq => PredicateOperator::Eq,
+ PredicateOperator::NotEq => PredicateOperator::NotEq,
+ PredicateOperator::GreaterThan => PredicateOperator::LessThan,
+ PredicateOperator::GreaterThanOrEq => PredicateOperator::LessThanOrEq,
+ PredicateOperator::LessThan => PredicateOperator::GreaterThan,
+ PredicateOperator::LessThanOrEq => PredicateOperator::GreaterThanOrEq,
+ _ => unreachable!("Reverse {}", op),
}
}
@@ -133,93 +209,113 @@ fn scalar_value_to_datum(value: &ScalarValue) ->
Option<Datum> {
}
}
-/// convert the data fusion Exp to an iceberg [`Predicate`]
-fn binary_op_to_predicate(reference: Reference, op: &Operator, datum: Datum)
-> Predicate {
- match op {
- Operator::Eq => reference.equal_to(datum),
- Operator::NotEq => reference.not_equal_to(datum),
- Operator::Lt => reference.less_than(datum),
- Operator::LtEq => reference.less_than_or_equal_to(datum),
- Operator::Gt => reference.greater_than(datum),
- Operator::GtEq => reference.greater_than_or_equal_to(datum),
- _ => Predicate::AlwaysTrue,
- }
-}
-
#[cfg(test)]
mod tests {
- use std::collections::VecDeque;
-
use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::common::tree_node::TreeNode;
use datafusion::common::DFSchema;
- use datafusion::prelude::SessionContext;
+ use datafusion::logical_expr::utils::split_conjunction;
+ use datafusion::prelude::{Expr, SessionContext};
use iceberg::expr::{Predicate, Reference};
use iceberg::spec::Datum;
- use super::ExprToPredicateVisitor;
+ use super::convert_filters_to_predicate;
fn create_test_schema() -> DFSchema {
let arrow_schema = Schema::new(vec![
- Field::new("foo", DataType::Int32, false),
- Field::new("bar", DataType::Utf8, false),
+ Field::new("foo", DataType::Int32, true),
+ Field::new("bar", DataType::Utf8, true),
]);
DFSchema::try_from_qualified_schema("my_table", &arrow_schema).unwrap()
}
- #[test]
- fn test_predicate_conversion_with_single_condition() {
- let sql = "foo > 1";
+ fn convert_to_iceberg_predicate(sql: &str) -> Option<Predicate> {
let df_schema = create_test_schema();
let expr = SessionContext::new()
.parse_sql_expr(sql, &df_schema)
.unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate().unwrap();
+ let exprs: Vec<Expr> =
split_conjunction(&expr).into_iter().cloned().collect();
+ convert_filters_to_predicate(&exprs[..])
+ }
+
+ #[test]
+ fn test_predicate_conversion_with_single_condition() {
+ let predicate = convert_to_iceberg_predicate("foo = 1").unwrap();
+ assert_eq!(predicate, Reference::new("foo").equal_to(Datum::long(1)));
+
+ let predicate = convert_to_iceberg_predicate("foo != 1").unwrap();
+ assert_eq!(
+ predicate,
+ Reference::new("foo").not_equal_to(Datum::long(1))
+ );
+
+ let predicate = convert_to_iceberg_predicate("foo > 1").unwrap();
assert_eq!(
predicate,
Reference::new("foo").greater_than(Datum::long(1))
);
+
+ let predicate = convert_to_iceberg_predicate("foo >= 1").unwrap();
+ assert_eq!(
+ predicate,
+ Reference::new("foo").greater_than_or_equal_to(Datum::long(1))
+ );
+
+ let predicate = convert_to_iceberg_predicate("foo < 1").unwrap();
+ assert_eq!(predicate, Reference::new("foo").less_than(Datum::long(1)));
+
+ let predicate = convert_to_iceberg_predicate("foo <= 1").unwrap();
+ assert_eq!(
+ predicate,
+ Reference::new("foo").less_than_or_equal_to(Datum::long(1))
+ );
+
+ let predicate = convert_to_iceberg_predicate("foo is null").unwrap();
+ assert_eq!(predicate, Reference::new("foo").is_null());
+
+ let predicate = convert_to_iceberg_predicate("foo is not
null").unwrap();
+ assert_eq!(predicate, Reference::new("foo").is_not_null());
+
+ let predicate = convert_to_iceberg_predicate("foo in (5, 6)").unwrap();
+ assert_eq!(
+ predicate,
+ Reference::new("foo").is_in([Datum::long(5), Datum::long(6)])
+ );
+
+ let predicate = convert_to_iceberg_predicate("foo not in (5,
6)").unwrap();
+ assert_eq!(
+ predicate,
+ Reference::new("foo").is_not_in([Datum::long(5), Datum::long(6)])
+ );
+
+ let predicate = convert_to_iceberg_predicate("not foo = 1").unwrap();
+ assert_eq!(predicate, !Reference::new("foo").equal_to(Datum::long(1)));
}
+
#[test]
fn test_predicate_conversion_with_single_unsupported_condition() {
- let sql = "foo is null";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate();
+ let predicate = convert_to_iceberg_predicate("foo + 1 = 1");
+ assert_eq!(predicate, None);
+
+ let predicate = convert_to_iceberg_predicate("length(bar) = 1");
+ assert_eq!(predicate, None);
+
+ let predicate = convert_to_iceberg_predicate("foo in (1, 2, foo)");
assert_eq!(predicate, None);
}
#[test]
fn test_predicate_conversion_with_single_condition_rev() {
- let sql = "1 < foo";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate().unwrap();
+ let predicate = convert_to_iceberg_predicate("1 < foo").unwrap();
assert_eq!(
predicate,
Reference::new("foo").greater_than(Datum::long(1))
);
}
+
#[test]
fn test_predicate_conversion_with_and_condition() {
let sql = "foo > 1 and bar = 'test'";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate().unwrap();
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
let expected_predicate = Predicate::and(
Reference::new("foo").greater_than(Datum::long(1)),
Reference::new("bar").equal_to(Datum::string("test")),
@@ -229,55 +325,42 @@ mod tests {
#[test]
fn test_predicate_conversion_with_and_condition_unsupported() {
- let sql = "foo > 1 and bar is not null";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate().unwrap();
+ let sql = "foo > 1 and length(bar) = 1";
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
let expected_predicate =
Reference::new("foo").greater_than(Datum::long(1));
assert_eq!(predicate, expected_predicate);
}
+
#[test]
fn test_predicate_conversion_with_and_condition_both_unsupported() {
- let sql = "foo in (1, 2, 3) and bar is not null";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate();
- let expected_predicate = None;
- assert_eq!(predicate, expected_predicate);
+ let sql = "foo in (1, 2, foo) and length(bar) = 1";
+ let predicate = convert_to_iceberg_predicate(sql);
+ assert_eq!(predicate, None);
}
#[test]
fn test_predicate_conversion_with_or_condition_unsupported() {
- let sql = "foo > 1 or bar is not null";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate();
- let expected_predicate = None;
+ let sql = "foo > 1 or length(bar) = 1";
+ let predicate = convert_to_iceberg_predicate(sql);
+ assert_eq!(predicate, None);
+ }
+
+ #[test]
+ fn test_predicate_conversion_with_or_condition_supported() {
+ let sql = "foo > 1 or bar = 'test'";
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
+ let expected_predicate = Predicate::or(
+ Reference::new("foo").greater_than(Datum::long(1)),
+ Reference::new("bar").equal_to(Datum::string("test")),
+ );
assert_eq!(predicate, expected_predicate);
}
#[test]
fn test_predicate_conversion_with_complex_binary_expr() {
let sql = "(foo > 1 and bar = 'test') or foo < 0 ";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate().unwrap();
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
+
let inner_predicate = Predicate::and(
Reference::new("foo").greater_than(Datum::long(1)),
Reference::new("bar").equal_to(Datum::string("test")),
@@ -290,46 +373,23 @@ mod tests {
}
#[test]
- fn test_predicate_conversion_with_complex_binary_expr_unsupported() {
- let sql = "(foo > 1 or bar in ('test', 'test2')) and foo < 0 ";
- let df_schema = create_test_schema();
- let expr = SessionContext::new()
- .parse_sql_expr(sql, &df_schema)
- .unwrap();
- let mut visitor = ExprToPredicateVisitor::new();
- expr.visit(&mut visitor).unwrap();
- let predicate = visitor.get_predicate().unwrap();
- let expected_predicate =
Reference::new("foo").less_than(Datum::long(0));
- assert_eq!(predicate, expected_predicate);
- }
+ fn test_predicate_conversion_with_one_and_expr_supported() {
+ let sql = "(foo > 1 and length(bar) = 1 ) or foo < 0 ";
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
- #[test]
- // test the get result method
- fn test_get_result_multiple() {
- let predicates = vec![
- Some(Reference::new("foo").greater_than(Datum::long(1))),
- None,
- Some(Reference::new("bar").equal_to(Datum::string("test"))),
- ];
- let stack = VecDeque::from(predicates);
- let visitor = ExprToPredicateVisitor { stack };
- assert_eq!(
- visitor.get_predicate(),
- Some(Predicate::and(
- Reference::new("foo").greater_than(Datum::long(1)),
- Reference::new("bar").equal_to(Datum::string("test")),
- ))
+ let inner_predicate =
Reference::new("foo").greater_than(Datum::long(1));
+ let expected_predicate = Predicate::or(
+ inner_predicate,
+ Reference::new("foo").less_than(Datum::long(0)),
);
+ assert_eq!(predicate, expected_predicate);
}
#[test]
- fn test_get_result_single() {
- let predicates =
vec![Some(Reference::new("foo").greater_than(Datum::long(1)))];
- let stack = VecDeque::from(predicates);
- let visitor = ExprToPredicateVisitor { stack };
- assert_eq!(
- visitor.get_predicate(),
- Some(Reference::new("foo").greater_than(Datum::long(1)))
- );
+ fn test_predicate_conversion_with_complex_binary_expr_unsupported() {
+ let sql = "(foo > 1 or length(bar) = 1 ) and foo < 0 ";
+ let predicate = convert_to_iceberg_predicate(sql).unwrap();
+ let expected_predicate =
Reference::new("foo").less_than(Datum::long(0));
+ assert_eq!(predicate, expected_predicate);
}
}
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index c53ce76d..59cf0997 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -22,7 +22,6 @@ use std::vec;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
-use datafusion::common::tree_node::TreeNode;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
@@ -35,7 +34,7 @@ use futures::{Stream, TryStreamExt};
use iceberg::expr::Predicate;
use iceberg::table::Table;
-use crate::physical_plan::expr_to_predicate::ExprToPredicateVisitor;
+use super::expr_to_predicate::convert_filters_to_predicate;
use crate::to_datafusion_error;
/// Manages the scanning process of an Iceberg [`Table`], encapsulating the
@@ -140,10 +139,13 @@ impl DisplayAs for IcebergTableScan {
) -> std::fmt::Result {
write!(
f,
- "IcebergTableScan projection:[{}]",
+ "IcebergTableScan projection:[{}] predicate:[{}]",
self.projection
.clone()
- .map_or(String::new(), |v| v.join(","))
+ .map_or(String::new(), |v| v.join(",")),
+ self.predicates
+ .clone()
+ .map_or(String::from(""), |p| format!("{}", p))
)
}
}
@@ -175,22 +177,6 @@ async fn get_batch_stream(
Ok(Box::pin(stream))
}
-/// Converts DataFusion filters ([`Expr`]) to an iceberg [`Predicate`].
-/// If none of the filters could be converted, return `None` which adds no
predicates to the scan operation.
-/// If the conversion was successful, return the converted predicates combined
with an AND operator.
-fn convert_filters_to_predicate(filters: &[Expr]) -> Option<Predicate> {
- filters
- .iter()
- .filter_map(|expr| {
- let mut visitor = ExprToPredicateVisitor::new();
- if expr.visit(&mut visitor).is_ok() {
- visitor.get_predicate()
- } else {
- None
- }
- })
- .reduce(Predicate::and)
-}
fn get_column_names(
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
diff --git a/crates/integrations/datafusion/src/table.rs
b/crates/integrations/datafusion/src/table.rs
index 2797e12d..bb24713a 100644
--- a/crates/integrations/datafusion/src/table.rs
+++ b/crates/integrations/datafusion/src/table.rs
@@ -23,7 +23,7 @@ use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::catalog::Session;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
-use datafusion::logical_expr::{BinaryExpr, Expr, TableProviderFilterPushDown};
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::table::Table;
@@ -99,15 +99,8 @@ impl TableProvider for IcebergTableProvider {
filters: &[&Expr],
) -> std::result::Result<Vec<TableProviderFilterPushDown>,
datafusion::error::DataFusionError>
{
- let filter_support = filters
- .iter()
- .map(|e| match e {
- Expr::BinaryExpr(BinaryExpr { .. }) =>
TableProviderFilterPushDown::Inexact,
- _ => TableProviderFilterPushDown::Unsupported,
- })
- .collect::<Vec<TableProviderFilterPushDown>>();
-
- Ok(filter_support)
+ // Push down all filters, as a single source of truth, the scanner
will drop the filters which couldn't be push down
+ Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
}
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index d6e22d04..d320c8ef 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -204,10 +204,7 @@ async fn test_table_projection() -> Result<()> {
.unwrap();
assert_eq!(2, s.len());
// the first row is logical_plan, the second row is physical_plan
- assert_eq!(
- "IcebergTableScan projection:[foo1,foo2,foo3]",
- s.value(1).trim()
- );
+ assert!(s.value(1).contains("projection:[foo1,foo2,foo3]"));
// datafusion doesn't support query foo3.s_foo1, use foo3 instead
let records = table_df
@@ -226,7 +223,54 @@ async fn test_table_projection() -> Result<()> {
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(2, s.len());
- assert_eq!("IcebergTableScan projection:[foo1,foo3]", s.value(1).trim());
+ assert!(s
+ .value(1)
+ .contains("IcebergTableScan projection:[foo1,foo3]"));
Ok(())
}
+
+#[tokio::test]
+async fn test_table_predict_pushdown() -> Result<()> {
+ let iceberg_catalog = get_iceberg_catalog();
+ let namespace = NamespaceIdent::new("ns".to_string());
+ set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "bar",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+ let creation = get_table_creation(temp_path(), "t1", Some(schema))?;
+ iceberg_catalog.create_table(&namespace, creation).await?;
+
+ let client = Arc::new(iceberg_catalog);
+ let catalog = Arc::new(IcebergCatalogProvider::try_new(client).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("catalog", catalog);
+ let records = ctx
+ .sql("select * from catalog.ns.t1 where (foo > 1 and length(bar) = 1 )
or bar is null")
+ .await
+ .unwrap()
+ .explain(false, false)
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ assert_eq!(1, records.len());
+ let record = &records[0];
+ // the first column is plan_type, the second column plan string.
+ let s = record
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(2, s.len());
+ // the first row is logical_plan, the second row is physical_plan
+ let expected = "predicate:[(foo > 1) OR (bar IS NULL)]";
+ assert!(s.value(1).trim().contains(expected));
+ Ok(())
+}