Copilot commented on code in PR #1886:
URL: https://github.com/apache/auron/pull/1886#discussion_r2684558254
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
Review Comment:
The AND predicate collection silently ignores sub-expressions that cannot be
converted to ORC predicates (return None). This changes the semantics of the
filter. For example, if the expression is `(col = 1) AND (unsupported_expr)`,
the result would be just `col = 1`, which is less restrictive than the original
predicate. When any sub-expression in an AND cannot be converted, the entire
AND expression should return None to ensure correct filtering semantics,
falling back to post-scan filtering.
```suggestion
/// Inner helper that returns `true` if all sub-expressions under this
/// node were successfully converted, and `false` if any sub-expression
/// was unsupported.
fn collect_and_predicates_inner(
expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
schema: &SchemaRef,
predicates: &mut Vec<Predicate>,
) -> bool {
if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
if matches!(binary.op(), Operator::And) {
// Recursively collect AND sub-conditions from both sides.
// If either side fails to convert, the entire AND expression
// is considered unsupported.
if !collect_and_predicates_inner(binary.left(), schema,
predicates) {
return false;
}
if !collect_and_predicates_inner(binary.right(), schema,
predicates) {
return false;
}
return true;
}
}
// Not an AND expression, convert the whole expression
// (could be OR, comparison, IS NULL, etc.)
match convert_expr_to_orc(expr, schema) {
Some(pred) => {
predicates.push(pred);
true
}
None => {
// Any unsupported sub-expression in an AND causes the whole
// AND to be treated as unsupported.
false
}
}
}
// Use a local vector so that, if any part of this AND cannot be
converted,
// we do not partially populate the caller's predicate list.
let mut local_predicates = Vec::new();
if collect_and_predicates_inner(expr, schema, &mut local_predicates) {
predicates.extend(local_predicates);
```
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level AND expression, flatten all AND conditions
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression, flatten all OR conditions
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ return None;
Review Comment:
When handling `WHERE false` with an empty schema (no columns), the function
returns None, which means no predicate will be applied. This could lead to
incorrect behavior where all rows are returned instead of filtering out all
data. Consider returning an error or using a different fallback strategy that
ensures all data is filtered when the schema is empty.
```suggestion
// Fallback: no columns in schema, construct an impossible
predicate
// using a synthetic column name to ensure all data is
filtered.
let col_name = "__orc_where_false_constant__";
return Some(Predicate::and(vec![
Predicate::is_null(col_name),
Predicate::not(Predicate::is_null(col_name)),
]));
```
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level AND expression, flatten all AND conditions
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression, flatten all OR conditions
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ return None;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+
+ // Handle NOT expressions (WHERE NOT condition)
+ if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+ if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+ return Some(Predicate::not(inner_pred));
+ }
+ return None;
+ }
+
+ // Handle IS NULL expressions
+ if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+ if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ return Some(Predicate::is_null(col_name));
+ }
+ return None;
+ }
+
+ // Handle IS NOT NULL expressions
+ if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+ if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>()
{
+ let col_name = col.name();
+ return Some(Predicate::not(Predicate::is_null(col_name)));
+ }
+ return None;
+ }
+
+ // Handle IN expressions (WHERE col IN (val1, val2, ...))
+ if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+ if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+
+ // Convert IN to multiple OR conditions: col = val1 OR col = val2
OR ...
+ let mut predicates = Vec::new();
+ for list_expr in in_list.list() {
+ if let Some(lit) =
list_expr.as_any().downcast_ref::<Literal>() {
+ if let Some(pred_value) =
convert_scalar_value(lit.value()) {
+ predicates.push(Predicate::eq(col_name, pred_value));
+ }
+ }
+ }
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ // If negated is true, it represents NOT IN
+ if in_list.negated() {
+ return Some(Predicate::not(Predicate::or(predicates)));
+ } else {
+ return Some(Predicate::or(predicates));
+ }
+ }
+ return None;
+ }
+
+ // Handle BinaryExpr (comparison operations)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ let op = binary.op();
+
+ // AND/OR are already handled at the outer level, skip here
+ if matches!(op, Operator::And | Operator::Or) {
+ return None;
+ }
+
+ if let Some(col) = left.as_any().downcast_ref::<Column>() {
+ if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate(col_name, op, value);
+ }
+ }
+
+ if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+ if let Some(col) = right.as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate_reversed(col_name, op,
value);
+ }
+ }
+ }
+
+ None
+}
+
+fn build_comparison_predicate(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn build_comparison_predicate_reversed(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn convert_scalar_value(value: &ScalarValue) -> Option<PredicateValue> {
+ match value {
+ ScalarValue::Boolean(v) => Some(PredicateValue::Boolean(*v)),
+ ScalarValue::Int8(v) => Some(PredicateValue::Int8(*v)),
+ ScalarValue::Int16(v) => Some(PredicateValue::Int16(*v)),
+ ScalarValue::Int32(v) => Some(PredicateValue::Int32(*v)),
+ ScalarValue::Int64(v) => Some(PredicateValue::Int64(*v)),
+ ScalarValue::Float32(v) => Some(PredicateValue::Float32(*v)),
+ ScalarValue::Float64(v) => Some(PredicateValue::Float64(*v)),
+ ScalarValue::Utf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
Review Comment:
The `convert_scalar_value` function only handles a limited set of scalar
types (Boolean, Int8, Int16, Int32, Int64, Float32, Float64, Utf8, LargeUtf8).
Common types like Date, Timestamp, Decimal, Binary, and others are not
supported and will cause predicates using these types to be silently ignored
(return None). Consider adding support for additional types that ORC can
handle, or document which types are supported and which are not.
```suggestion
ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
// Map common temporal types to their underlying integer
representations
// so they can participate in ORC predicate pushdown.
ScalarValue::Date32(v) => Some(PredicateValue::Int32(*v)),
ScalarValue::Date64(v) => Some(PredicateValue::Int64(*v)),
ScalarValue::TimestampSecond(v, _) =>
Some(PredicateValue::Int64(*v)),
ScalarValue::TimestampMillisecond(v, _) =>
Some(PredicateValue::Int64(*v)),
ScalarValue::TimestampMicrosecond(v, _) =>
Some(PredicateValue::Int64(*v)),
ScalarValue::TimestampNanosecond(v, _) =>
Some(PredicateValue::Int64(*v)),
```
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level AND expression, flatten all AND conditions
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression, flatten all OR conditions
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ return None;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+
+ // Handle NOT expressions (WHERE NOT condition)
+ if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+ if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+ return Some(Predicate::not(inner_pred));
+ }
+ return None;
+ }
+
+ // Handle IS NULL expressions
+ if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+ if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ return Some(Predicate::is_null(col_name));
+ }
+ return None;
+ }
+
+ // Handle IS NOT NULL expressions
+ if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+ if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>()
{
+ let col_name = col.name();
+ return Some(Predicate::not(Predicate::is_null(col_name)));
+ }
+ return None;
+ }
+
+ // Handle IN expressions (WHERE col IN (val1, val2, ...))
+ if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+ if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+
+ // Convert IN to multiple OR conditions: col = val1 OR col = val2
OR ...
+ let mut predicates = Vec::new();
+ for list_expr in in_list.list() {
+ if let Some(lit) =
list_expr.as_any().downcast_ref::<Literal>() {
+ if let Some(pred_value) =
convert_scalar_value(lit.value()) {
+ predicates.push(Predicate::eq(col_name, pred_value));
+ }
+ }
+ }
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ // If negated is true, it represents NOT IN
+ if in_list.negated() {
+ return Some(Predicate::not(Predicate::or(predicates)));
+ } else {
+ return Some(Predicate::or(predicates));
+ }
+ }
+ return None;
+ }
+
+ // Handle BinaryExpr (comparison operations)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ let op = binary.op();
+
+ // AND/OR are already handled at the outer level, skip here
+ if matches!(op, Operator::And | Operator::Or) {
+ return None;
+ }
+
+ if let Some(col) = left.as_any().downcast_ref::<Column>() {
+ if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate(col_name, op, value);
+ }
+ }
+
+ if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+ if let Some(col) = right.as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate_reversed(col_name, op,
value);
+ }
+ }
+ }
+
+ None
+}
+
+fn build_comparison_predicate(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn build_comparison_predicate_reversed(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn convert_scalar_value(value: &ScalarValue) -> Option<PredicateValue> {
+ match value {
+ ScalarValue::Boolean(v) => Some(PredicateValue::Boolean(*v)),
+ ScalarValue::Int8(v) => Some(PredicateValue::Int8(*v)),
+ ScalarValue::Int16(v) => Some(PredicateValue::Int16(*v)),
+ ScalarValue::Int32(v) => Some(PredicateValue::Int32(*v)),
+ ScalarValue::Int64(v) => Some(PredicateValue::Int64(*v)),
+ ScalarValue::Float32(v) => Some(PredicateValue::Float32(*v)),
+ ScalarValue::Float64(v) => Some(PredicateValue::Float64(*v)),
+ ScalarValue::Utf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
Review Comment:
The `convert_scalar_value` function doesn't properly handle NULL values
within ScalarValue variants. When a ScalarValue contains an Option::None (e.g.,
ScalarValue::Int32(None)), the function still wraps it in
Some(PredicateValue::Int32(None)). This could lead to incorrect predicate
behavior when comparing columns to NULL literals.
The function should check if the inner Option is None and return None in
that case, since NULL comparisons should be handled separately using IS NULL/IS
NOT NULL predicates rather than equality comparisons.
```suggestion
ScalarValue::Boolean(Some(v)) =>
Some(PredicateValue::Boolean(Some(*v))),
ScalarValue::Boolean(None) => None,
ScalarValue::Int8(Some(v)) => Some(PredicateValue::Int8(Some(*v))),
ScalarValue::Int8(None) => None,
ScalarValue::Int16(Some(v)) => Some(PredicateValue::Int16(Some(*v))),
ScalarValue::Int16(None) => None,
ScalarValue::Int32(Some(v)) => Some(PredicateValue::Int32(Some(*v))),
ScalarValue::Int32(None) => None,
ScalarValue::Int64(Some(v)) => Some(PredicateValue::Int64(Some(*v))),
ScalarValue::Int64(None) => None,
ScalarValue::Float32(Some(v)) =>
Some(PredicateValue::Float32(Some(*v))),
ScalarValue::Float32(None) => None,
ScalarValue::Float64(Some(v)) =>
Some(PredicateValue::Float64(Some(*v))),
ScalarValue::Float64(None) => None,
ScalarValue::Utf8(Some(v)) =>
Some(PredicateValue::Utf8(Some(v.clone()))),
ScalarValue::Utf8(None) => None,
ScalarValue::LargeUtf8(Some(v)) =>
Some(PredicateValue::Utf8(Some(v.clone()))),
ScalarValue::LargeUtf8(None) => None,
```
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level AND expression, flatten all AND conditions
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression, flatten all OR conditions
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ return None;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+
+ // Handle NOT expressions (WHERE NOT condition)
+ if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+ if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+ return Some(Predicate::not(inner_pred));
+ }
+ return None;
+ }
+
+ // Handle IS NULL expressions
+ if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+ if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ return Some(Predicate::is_null(col_name));
+ }
+ return None;
+ }
+
+ // Handle IS NOT NULL expressions
+ if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+ if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>()
{
+ let col_name = col.name();
+ return Some(Predicate::not(Predicate::is_null(col_name)));
+ }
+ return None;
+ }
+
+ // Handle IN expressions (WHERE col IN (val1, val2, ...))
+ if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+ if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+
+ // Convert IN to multiple OR conditions: col = val1 OR col = val2
OR ...
+ let mut predicates = Vec::new();
+ for list_expr in in_list.list() {
+ if let Some(lit) =
list_expr.as_any().downcast_ref::<Literal>() {
+ if let Some(pred_value) =
convert_scalar_value(lit.value()) {
+ predicates.push(Predicate::eq(col_name, pred_value));
+ }
+ }
+ }
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ // If negated is true, it represents NOT IN
+ if in_list.negated() {
+ return Some(Predicate::not(Predicate::or(predicates)));
+ } else {
+ return Some(Predicate::or(predicates));
+ }
+ }
+ return None;
+ }
+
+ // Handle BinaryExpr (comparison operations)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ let op = binary.op();
+
+ // AND/OR are already handled at the outer level, skip here
+ if matches!(op, Operator::And | Operator::Or) {
+ return None;
+ }
+
+ if let Some(col) = left.as_any().downcast_ref::<Column>() {
+ if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate(col_name, op, value);
+ }
+ }
+
+ if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+ if let Some(col) = right.as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate_reversed(col_name, op,
value);
+ }
+ }
+ }
+
+ None
+}
+
+fn build_comparison_predicate(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn build_comparison_predicate_reversed(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+ _ => None,
+ }
+}
Review Comment:
The `build_comparison_predicate` and `build_comparison_predicate_reversed`
functions lack documentation. Adding documentation would clarify why there are
two separate functions and how the reversal logic works for different operators
(e.g., how `Lt` becomes `Gt` when reversed).
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
Review Comment:
The `convert_predicate_to_orc` function lacks documentation explaining its
purpose, parameters, return value, and importantly, what happens when a
predicate cannot be converted (returns None). Adding comprehensive
documentation would help future maintainers understand that returning None
means the predicate cannot be pushed down and must be handled by post-scan
filtering.
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
Review Comment:
Similar to the AND case, OR predicate collection silently ignores
sub-expressions that cannot be converted. However, for OR expressions, this
might be more acceptable since `(col = 1) OR (unsupported_expr)` being reduced
to `col = 1` could be overly restrictive but not incorrect if post-scan
filtering is applied. Still, it's worth considering whether to track if any
sub-expression failed conversion and potentially skip predicate pushdown for
the entire OR expression to maintain exact semantics.
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level AND expression, flatten all AND conditions
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression, flatten all OR conditions
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ return None;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+
+ // Handle NOT expressions (WHERE NOT condition)
+ if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+ if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+ return Some(Predicate::not(inner_pred));
+ }
+ return None;
+ }
+
+ // Handle IS NULL expressions
+ if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+ if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ return Some(Predicate::is_null(col_name));
+ }
+ return None;
+ }
+
+ // Handle IS NOT NULL expressions
+ if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+ if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>()
{
+ let col_name = col.name();
+ return Some(Predicate::not(Predicate::is_null(col_name)));
+ }
+ return None;
+ }
+
+ // Handle IN expressions (WHERE col IN (val1, val2, ...))
+ if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+ if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+
+ // Convert IN to multiple OR conditions: col = val1 OR col = val2
OR ...
+ let mut predicates = Vec::new();
+ for list_expr in in_list.list() {
+ if let Some(lit) =
list_expr.as_any().downcast_ref::<Literal>() {
+ if let Some(pred_value) =
convert_scalar_value(lit.value()) {
+ predicates.push(Predicate::eq(col_name, pred_value));
+ }
+ }
+ }
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ // If negated is true, it represents NOT IN
+ if in_list.negated() {
+ return Some(Predicate::not(Predicate::or(predicates)));
+ } else {
+ return Some(Predicate::or(predicates));
+ }
+ }
+ return None;
+ }
+
+ // Handle BinaryExpr (comparison operations)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ let op = binary.op();
+
+ // AND/OR are already handled at the outer level, skip here
+ if matches!(op, Operator::And | Operator::Or) {
+ return None;
+ }
+
+ if let Some(col) = left.as_any().downcast_ref::<Column>() {
+ if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate(col_name, op, value);
+ }
+ }
+
+ if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+ if let Some(col) = right.as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate_reversed(col_name, op,
value);
+ }
+ }
+ }
+
+ None
+}
+
+fn build_comparison_predicate(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn build_comparison_predicate_reversed(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn convert_scalar_value(value: &ScalarValue) -> Option<PredicateValue> {
+ match value {
+ ScalarValue::Boolean(v) => Some(PredicateValue::Boolean(*v)),
+ ScalarValue::Int8(v) => Some(PredicateValue::Int8(*v)),
+ ScalarValue::Int16(v) => Some(PredicateValue::Int16(*v)),
+ ScalarValue::Int32(v) => Some(PredicateValue::Int32(*v)),
+ ScalarValue::Int64(v) => Some(PredicateValue::Int64(*v)),
+ ScalarValue::Float32(v) => Some(PredicateValue::Float32(*v)),
+ ScalarValue::Float64(v) => Some(PredicateValue::Float64(*v)),
+ ScalarValue::Utf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ _ => None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::{
+ logical_expr::Operator,
+ physical_expr::expressions::{
+ BinaryExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, NotExpr,
+ },
+ scalar::ScalarValue,
+ };
+
+ use super::*;
+
+ fn create_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("age", DataType::Int32, true),
+ Field::new("score", DataType::Float64, true),
+ ]))
+ }
+
+ #[test]
+ fn test_literal_true() {
+ let schema = create_test_schema();
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // WHERE true should return None (no filtering)
+ assert!(result.is_none());
+ }
+
+ #[test]
+ fn test_literal_false() {
+ let schema = create_test_schema();
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // WHERE false should return a predicate that filters all data
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_comparison_eq() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Comparison { column: \"id\", op: Equal, value: Int32(Some(42)) }"
+ );
+ }
+
+ #[test]
+ fn test_comparison_ne() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let lit =
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::NotEq, lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Comparison { column: \"name\", op: NotEqual, value:
Utf8(Some(\"test\")) }"
+ );
+ }
+
+ #[test]
+ fn test_comparison_lt_gt_lte_gte() {
+ let schema = create_test_schema();
+
+ // Test LT
+ let col = Arc::new(Column::new("age", 2));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(30))));
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Lt,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test GT
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Gt,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test LtEq
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::LtEq,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test GtEq
+ let expr = Arc::new(BinaryExpr::new(col, Operator::GtEq, lit));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_comparison_reversed() {
+ let schema = create_test_schema();
+ // Literal on left: 42 = id (should be reversed to: id = 42)
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let col = Arc::new(Column::new("id", 0));
+ let expr = Arc::new(BinaryExpr::new(lit, Operator::Eq, col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_is_null() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let expr = Arc::new(IsNullExpr::new(col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(format!("{:?}", predicate), "IsNull { column: \"name\" }");
+ }
+
+ #[test]
+ fn test_is_not_null() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("age", 2));
+ let expr = Arc::new(IsNotNullExpr::new(col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Not(IsNull { column: \"age\" })"
+ );
+ }
+
+ #[test]
+ fn test_not_expr() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let eq_expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ let not_expr = Arc::new(NotExpr::new(eq_expr));
+
+ let result = convert_predicate_to_orc(Some(not_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("Not(Comparison"));
+ }
+
+ #[test]
+ fn test_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(2))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(3))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, false, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ // IN list should be converted to OR of equality predicates
+ assert!(format!("{:?}", predicate).starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_not_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, true, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ // NOT IN should be converted to NOT(OR(...))
+ assert!(format!("{:?}", predicate).starts_with("Not(Or(["));
+ }
+
+ #[test]
+ fn test_and_simple() {
+ let schema = create_test_schema();
+ // id = 42 AND age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let and_expr = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("And(["));
+ }
+
+ #[test]
+ fn test_and_nested_flattening() {
+ let schema = create_test_schema();
+ // ((id = 1 AND age = 2) AND name = "foo") should be flattened to
And([...])
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+ let result = convert_predicate_to_orc(Some(and2), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([cond1, cond2, cond3])
+ assert!(debug_str.starts_with("And(["));
+ // Count the number of conditions (should be 3)
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 3);
+ }
+
+ #[test]
+ fn test_or_simple() {
+ let schema = create_test_schema();
+ // id = 1 OR id = 2
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let result = convert_predicate_to_orc(Some(or_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_or_nested_flattening() {
+ let schema = create_test_schema();
+ // ((id = 1 OR age = 2) OR score = 3.0) should be flattened
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let or1 = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let col3 = Arc::new(Column::new("score", 3));
+ let lit3 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let or2 = Arc::new(BinaryExpr::new(or1, Operator::Or, expr3));
+
+ let result = convert_predicate_to_orc(Some(or2), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to Or([cond1, cond2, cond3])
+ assert!(debug_str.starts_with("Or(["));
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 3);
+ }
+
+ #[test]
+ fn test_complex_mixed_predicates() {
+ let schema = create_test_schema();
+ // (id = 1 OR id = 2) AND name IS NOT NULL
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let col2 = Arc::new(Column::new("name", 1));
+ let is_not_null = Arc::new(IsNotNullExpr::new(col2));
+
+ let and_expr = Arc::new(BinaryExpr::new(or_expr, Operator::And,
is_not_null));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should have And at top level
+ assert!(
+ debug_str.contains("And"),
+ "Expected And, got: {}",
+ debug_str
+ );
+ // Should contain OR for the id conditions
+ assert!(debug_str.contains("Or"), "Expected Or, got: {}", debug_str);
+ // Should contain the IS NOT NULL condition
+ assert!(
+ debug_str.contains("IsNull"),
+ "Expected IsNull, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_deeply_nested_and() {
+ let schema = create_test_schema();
+ // Build: (((id = 1 AND age = 2) AND name = "foo") AND score = 3.0)
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+ let col4 = Arc::new(Column::new("score", 3));
+ let lit4 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+ let expr4 = Arc::new(BinaryExpr::new(col4, Operator::Eq, lit4));
+
+ let and3 = Arc::new(BinaryExpr::new(and2, Operator::And, expr4));
+
+ let result = convert_predicate_to_orc(Some(and3), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([cond1, cond2, cond3, cond4])
+ assert!(debug_str.starts_with("And(["));
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 4);
+ }
+
+ #[test]
+ fn test_all_scalar_types() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("col_bool", DataType::Boolean, true),
+ Field::new("col_i8", DataType::Int8, true),
+ Field::new("col_i16", DataType::Int16, true),
+ Field::new("col_i32", DataType::Int32, true),
+ Field::new("col_i64", DataType::Int64, true),
+ Field::new("col_f32", DataType::Float32, true),
+ Field::new("col_f64", DataType::Float64, true),
+ Field::new("col_utf8", DataType::Utf8, true),
+ ]));
+
+ // Test Boolean
+ let col = Arc::new(Column::new("col_bool", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int8
+ let col = Arc::new(Column::new("col_i8", 1));
+ let lit = Arc::new(Literal::new(ScalarValue::Int8(Some(42))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int16
+ let col = Arc::new(Column::new("col_i16", 2));
+ let lit = Arc::new(Literal::new(ScalarValue::Int16(Some(1000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int32
+ let col = Arc::new(Column::new("col_i32", 3));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(100000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int64
+ let col = Arc::new(Column::new("col_i64", 4));
+ let lit = Arc::new(Literal::new(ScalarValue::Int64(Some(1000000000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Float32
+ let col = Arc::new(Column::new("col_f32", 5));
+ let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(3.14))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Float64
+ let col = Arc::new(Column::new("col_f64", 6));
+ let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(2.718))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Utf8
+ let col = Arc::new(Column::new("col_utf8", 7));
+ let lit =
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+ }
+}
Review Comment:
The test suite is missing coverage for NULL literal values in predicates.
There are no tests for cases like "WHERE col = NULL" or "WHERE col IN (1, NULL,
3)". These edge cases should be tested to ensure NULL values in comparison
expressions are handled correctly.
##########
native-engine/datafusion-ext-plans/src/orc_exec.rs:
##########
@@ -414,3 +430,698 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level AND expression, flatten all AND conditions
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression, flatten all OR conditions
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ return None;
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+
+ // Handle NOT expressions (WHERE NOT condition)
+ if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+ if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+ return Some(Predicate::not(inner_pred));
+ }
+ return None;
+ }
+
+ // Handle IS NULL expressions
+ if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+ if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ return Some(Predicate::is_null(col_name));
+ }
+ return None;
+ }
+
+ // Handle IS NOT NULL expressions
+ if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+ if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>()
{
+ let col_name = col.name();
+ return Some(Predicate::not(Predicate::is_null(col_name)));
+ }
+ return None;
+ }
+
+ // Handle IN expressions (WHERE col IN (val1, val2, ...))
+ if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+ if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+
+ // Convert IN to multiple OR conditions: col = val1 OR col = val2
OR ...
+ let mut predicates = Vec::new();
+ for list_expr in in_list.list() {
+ if let Some(lit) =
list_expr.as_any().downcast_ref::<Literal>() {
+ if let Some(pred_value) =
convert_scalar_value(lit.value()) {
+ predicates.push(Predicate::eq(col_name, pred_value));
+ }
+ }
+ }
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ // If negated is true, it represents NOT IN
+ if in_list.negated() {
+ return Some(Predicate::not(Predicate::or(predicates)));
+ } else {
+ return Some(Predicate::or(predicates));
+ }
+ }
+ return None;
+ }
+
+ // Handle BinaryExpr (comparison operations)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ let op = binary.op();
+
+ // AND/OR are already handled at the outer level, skip here
+ if matches!(op, Operator::And | Operator::Or) {
+ return None;
+ }
+
+ if let Some(col) = left.as_any().downcast_ref::<Column>() {
+ if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate(col_name, op, value);
+ }
+ }
+
+ if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+ if let Some(col) = right.as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate_reversed(col_name, op,
value);
+ }
+ }
+ }
+
+ None
+}
+
+fn build_comparison_predicate(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn build_comparison_predicate_reversed(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn convert_scalar_value(value: &ScalarValue) -> Option<PredicateValue> {
+ match value {
+ ScalarValue::Boolean(v) => Some(PredicateValue::Boolean(*v)),
+ ScalarValue::Int8(v) => Some(PredicateValue::Int8(*v)),
+ ScalarValue::Int16(v) => Some(PredicateValue::Int16(*v)),
+ ScalarValue::Int32(v) => Some(PredicateValue::Int32(*v)),
+ ScalarValue::Int64(v) => Some(PredicateValue::Int64(*v)),
+ ScalarValue::Float32(v) => Some(PredicateValue::Float32(*v)),
+ ScalarValue::Float64(v) => Some(PredicateValue::Float64(*v)),
+ ScalarValue::Utf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ _ => None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::{
+ logical_expr::Operator,
+ physical_expr::expressions::{
+ BinaryExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, NotExpr,
+ },
+ scalar::ScalarValue,
+ };
+
+ use super::*;
+
+ fn create_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("age", DataType::Int32, true),
+ Field::new("score", DataType::Float64, true),
+ ]))
+ }
+
+ #[test]
+ fn test_literal_true() {
+ let schema = create_test_schema();
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // WHERE true should return None (no filtering)
+ assert!(result.is_none());
+ }
+
+ #[test]
+ fn test_literal_false() {
+ let schema = create_test_schema();
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // WHERE false should return a predicate that filters all data
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_comparison_eq() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Comparison { column: \"id\", op: Equal, value: Int32(Some(42)) }"
+ );
+ }
+
+ #[test]
+ fn test_comparison_ne() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let lit =
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::NotEq, lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Comparison { column: \"name\", op: NotEqual, value:
Utf8(Some(\"test\")) }"
+ );
+ }
+
+ #[test]
+ fn test_comparison_lt_gt_lte_gte() {
+ let schema = create_test_schema();
+
+ // Test LT
+ let col = Arc::new(Column::new("age", 2));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(30))));
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Lt,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test GT
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Gt,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test LtEq
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::LtEq,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test GtEq
+ let expr = Arc::new(BinaryExpr::new(col, Operator::GtEq, lit));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_comparison_reversed() {
+ let schema = create_test_schema();
+ // Literal on left: 42 = id (should be reversed to: id = 42)
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let col = Arc::new(Column::new("id", 0));
+ let expr = Arc::new(BinaryExpr::new(lit, Operator::Eq, col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_is_null() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let expr = Arc::new(IsNullExpr::new(col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(format!("{:?}", predicate), "IsNull { column: \"name\" }");
+ }
+
+ #[test]
+ fn test_is_not_null() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("age", 2));
+ let expr = Arc::new(IsNotNullExpr::new(col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Not(IsNull { column: \"age\" })"
+ );
+ }
+
+ #[test]
+ fn test_not_expr() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let eq_expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ let not_expr = Arc::new(NotExpr::new(eq_expr));
+
+ let result = convert_predicate_to_orc(Some(not_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("Not(Comparison"));
+ }
+
+ #[test]
+ fn test_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(2))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(3))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, false, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ // IN list should be converted to OR of equality predicates
+ assert!(format!("{:?}", predicate).starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_not_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, true, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ // NOT IN should be converted to NOT(OR(...))
+ assert!(format!("{:?}", predicate).starts_with("Not(Or(["));
+ }
+
+ #[test]
+ fn test_and_simple() {
+ let schema = create_test_schema();
+ // id = 42 AND age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let and_expr = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("And(["));
+ }
+
+ #[test]
+ fn test_and_nested_flattening() {
+ let schema = create_test_schema();
+ // ((id = 1 AND age = 2) AND name = "foo") should be flattened to
And([...])
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+ let result = convert_predicate_to_orc(Some(and2), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([cond1, cond2, cond3])
+ assert!(debug_str.starts_with("And(["));
+ // Count the number of conditions (should be 3)
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 3);
+ }
+
+ #[test]
+ fn test_or_simple() {
+ let schema = create_test_schema();
+ // id = 1 OR id = 2
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let result = convert_predicate_to_orc(Some(or_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_or_nested_flattening() {
+ let schema = create_test_schema();
+ // ((id = 1 OR age = 2) OR score = 3.0) should be flattened
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let or1 = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let col3 = Arc::new(Column::new("score", 3));
+ let lit3 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let or2 = Arc::new(BinaryExpr::new(or1, Operator::Or, expr3));
+
+ let result = convert_predicate_to_orc(Some(or2), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to Or([cond1, cond2, cond3])
+ assert!(debug_str.starts_with("Or(["));
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 3);
+ }
+
+ #[test]
+ fn test_complex_mixed_predicates() {
+ let schema = create_test_schema();
+ // (id = 1 OR id = 2) AND name IS NOT NULL
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let col2 = Arc::new(Column::new("name", 1));
+ let is_not_null = Arc::new(IsNotNullExpr::new(col2));
+
+ let and_expr = Arc::new(BinaryExpr::new(or_expr, Operator::And,
is_not_null));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should have And at top level
+ assert!(
+ debug_str.contains("And"),
+ "Expected And, got: {}",
+ debug_str
+ );
+ // Should contain OR for the id conditions
+ assert!(debug_str.contains("Or"), "Expected Or, got: {}", debug_str);
+ // Should contain the IS NOT NULL condition
+ assert!(
+ debug_str.contains("IsNull"),
+ "Expected IsNull, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_deeply_nested_and() {
+ let schema = create_test_schema();
+ // Build: (((id = 1 AND age = 2) AND name = "foo") AND score = 3.0)
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+ let col4 = Arc::new(Column::new("score", 3));
+ let lit4 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+ let expr4 = Arc::new(BinaryExpr::new(col4, Operator::Eq, lit4));
+
+ let and3 = Arc::new(BinaryExpr::new(and2, Operator::And, expr4));
+
+ let result = convert_predicate_to_orc(Some(and3), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([cond1, cond2, cond3, cond4])
+ assert!(debug_str.starts_with("And(["));
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 4);
+ }
+
+ #[test]
+ fn test_all_scalar_types() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("col_bool", DataType::Boolean, true),
+ Field::new("col_i8", DataType::Int8, true),
+ Field::new("col_i16", DataType::Int16, true),
+ Field::new("col_i32", DataType::Int32, true),
+ Field::new("col_i64", DataType::Int64, true),
+ Field::new("col_f32", DataType::Float32, true),
+ Field::new("col_f64", DataType::Float64, true),
+ Field::new("col_utf8", DataType::Utf8, true),
+ ]));
+
+ // Test Boolean
+ let col = Arc::new(Column::new("col_bool", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int8
+ let col = Arc::new(Column::new("col_i8", 1));
+ let lit = Arc::new(Literal::new(ScalarValue::Int8(Some(42))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int16
+ let col = Arc::new(Column::new("col_i16", 2));
+ let lit = Arc::new(Literal::new(ScalarValue::Int16(Some(1000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int32
+ let col = Arc::new(Column::new("col_i32", 3));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(100000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int64
+ let col = Arc::new(Column::new("col_i64", 4));
+ let lit = Arc::new(Literal::new(ScalarValue::Int64(Some(1000000000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Float32
+ let col = Arc::new(Column::new("col_f32", 5));
+ let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(3.14))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Float64
+ let col = Arc::new(Column::new("col_f64", 6));
+ let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(2.718))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Utf8
+ let col = Arc::new(Column::new("col_utf8", 7));
+ let lit =
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+ }
+}
Review Comment:
The test suite lacks coverage for unsupported expressions in predicates.
There are no tests for cases where some sub-expressions cannot be converted to
ORC predicates, such as complex expressions, LIKE patterns, or other operations
not supported by ORC predicate pushdown. Tests should verify that unsupported
predicates are handled correctly and don't silently change filtering semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]