This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 0bb19933 [AURON #1624] Support the pushdown of ORC predicates (#1886)
0bb19933 is described below
commit 0bb19933fa6eae2cca95af27ea91f2d5be339c3f
Author: Graceful <[email protected]>
AuthorDate: Fri Jan 16 16:32:17 2026 +0800
[AURON #1624] Support the pushdown of ORC predicates (#1886)
# Which issue does this PR close?
Closes #1624
# Rationale for this change
Support the push-down of ORC predicates to enhance the reading
efficiency of ORC files
# What changes are included in this PR?
# Are there any user-facing changes?
no
# How was this patch tested?
cluster test
---------
Co-authored-by: duanhao-jk <[email protected]>
---
Cargo.lock | 3 +-
Cargo.toml | 2 +-
native-engine/datafusion-ext-plans/src/orc_exec.rs | 1247 +++++++++++++++++++-
3 files changed, 1240 insertions(+), 12 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 01266312..816d296f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2794,7 +2794,7 @@ checksum =
"42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "orc-rust"
version = "0.7.0"
-source =
"git+https://github.com/auron-project/datafusion-orc.git?rev=919e050#919e0509b2b4b5a3bb70eea4b34633d3a3c933e8"
+source =
"git+https://github.com/auron-project/datafusion-orc.git?rev=17f7012#17f7012bbd82cbe9967d99fb49aa0069306537fd"
dependencies = [
"arrow",
"async-trait",
@@ -2807,6 +2807,7 @@ dependencies = [
"futures",
"futures-util",
"iana-time-zone",
+ "log",
"lz4_flex 0.11.5",
"lzokay-native",
"num",
diff --git a/Cargo.toml b/Cargo.toml
index 47470c13..e751b3a8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -121,7 +121,7 @@ datafusion-execution = { git =
"https://github.com/auron-project/datafusion.git"
datafusion-optimizer = { git =
"https://github.com/auron-project/datafusion.git", rev = "9034aeffb"}
datafusion-physical-expr = { git =
"https://github.com/auron-project/datafusion.git", rev = "9034aeffb"}
datafusion-spark = { git = "https://github.com/auron-project/datafusion.git",
rev = "9034aeffb"}
-orc-rust = { git = "https://github.com/auron-project/datafusion-orc.git", rev
= "919e050"}
+orc-rust = { git = "https://github.com/auron-project/datafusion-orc.git", rev
= "17f7012"}
# arrow: branch=v55.2.0-blaze
arrow = { git = "https://github.com/auron-project/arrow-rs.git", rev =
"5de02520c"}
diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs
b/native-engine/datafusion-ext-plans/src/orc_exec.rs
index c53cb6b5..965b53e9 100644
--- a/native-engine/datafusion-ext-plans/src/orc_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs
@@ -29,13 +29,21 @@ use datafusion::{
},
error::Result,
execution::context::TaskContext,
- physical_expr::{EquivalenceProperties, PhysicalExprRef},
+ logical_expr::Operator,
+ physical_expr::{
+ EquivalenceProperties, PhysicalExprRef,
+ expressions::{
+ BinaryExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, NotExpr, SCAndExpr,
+ SCOrExpr,
+ },
+ },
physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties,
SendableRecordBatchStream, Statistics,
execution_plan::{Boundedness, EmissionType},
metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
},
+ scalar::ScalarValue,
};
use datafusion_datasource::PartitionedFile;
use datafusion_ext_commons::{batch_size, df_execution_err,
hadoop_fs::FsProvider};
@@ -45,6 +53,7 @@ use once_cell::sync::OnceCell;
use orc_rust::{
TimestampPrecision,
arrow_reader::ArrowReaderBuilder,
+ predicate::{Predicate, PredicateValue},
projection::ProjectionMask,
reader::{AsyncChunkReader, metadata::FileMetadata},
};
@@ -62,7 +71,7 @@ pub struct OrcExec {
projected_statistics: Statistics,
projected_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
- _predicate: Option<PhysicalExprRef>,
+ predicate: Option<PhysicalExprRef>,
props: OnceCell<PlanProperties>,
}
@@ -72,7 +81,7 @@ impl OrcExec {
pub fn new(
base_config: FileScanConfig,
fs_resource_id: String,
- _predicate: Option<PhysicalExprRef>,
+ predicate: Option<PhysicalExprRef>,
) -> Self {
let metrics = ExecutionPlanMetricsSet::new();
@@ -85,7 +94,7 @@ impl OrcExec {
projected_statistics,
projected_schema,
metrics,
- _predicate,
+ predicate,
props: OnceCell::new(),
}
}
@@ -96,11 +105,12 @@ impl DisplayAs for OrcExec {
let limit = self.base_config.limit;
let projection = self.base_config.projection.clone();
let file_group = &self.base_config.file_groups;
+ let pred = &self.predicate;
write!(
f,
- "OrcExec: file_group={:?}, limit={:?}, projection={:?}",
- file_group, limit, projection
+ "OrcExec: file_group={:?}, limit={:?}, projection={:?},
predicate={:?}",
+ file_group, limit, projection, pred
)
}
}
@@ -172,6 +182,7 @@ impl ExecutionPlan for OrcExec {
force_positional_evolution,
use_microsecond_precision,
is_case_sensitive,
+ predicate: self.predicate.clone(),
});
let file_stream = Box::pin(FileStream::new(
@@ -220,6 +231,7 @@ struct OrcOpener {
force_positional_evolution: bool,
use_microsecond_precision: bool,
is_case_sensitive: bool,
+ predicate: Option<PhysicalExprRef>,
}
impl FileOpener for OrcOpener {
@@ -244,11 +256,12 @@ impl FileOpener for OrcOpener {
let projected_schema =
SchemaRef::from(self.table_schema.project(&projection)?);
let schema_adapter = SchemaAdapter::new(
self.table_schema.clone(),
- projected_schema,
+ projected_schema.clone(),
self.force_positional_evolution,
);
let use_microsecond = self.use_microsecond_precision;
let is_case = self.is_case_sensitive;
+ let predicate = self.predicate.clone();
Ok(Box::pin(async move {
let mut builder = ArrowReaderBuilder::try_new_async(reader)
@@ -267,10 +280,14 @@ impl FileOpener for OrcOpener {
let projection_mask =
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
- let stream = builder
+ builder = builder
.with_batch_size(batch_size)
- .with_projection(projection_mask)
- .build_async();
+ .with_projection(projection_mask);
+ if let Some(orc_predicate) = convert_predicate_to_orc(predicate,
&projected_schema) {
+ builder = builder.with_predicate(orc_predicate);
+ }
+
+ let stream = builder.build_async();
let adapted = stream
.map_err(|e| ArrowError::ExternalError(Box::new(e)))
@@ -414,3 +431,1213 @@ impl OrcFileMetrics {
Self { bytes_scanned }
}
}
+
+fn convert_predicate_to_orc(
+ predicate: Option<PhysicalExprRef>,
+ file_schema: &SchemaRef,
+) -> Option<Predicate> {
+ let predicate = predicate?;
+ convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ // Handle short-circuit AND expression (SCAndExpr)
+ if let Some(sc_and) = expr.as_any().downcast_ref::<SCAndExpr>() {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(&sc_and.left, schema, predicates);
+ collect_and_predicates(&sc_and.right, schema, predicates);
+ return;
+ }
+
+ // Handle BinaryExpr with AND operator
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ // Recursively collect AND sub-conditions from both sides
+ collect_and_predicates(binary.left(), schema, predicates);
+ collect_and_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an AND expression, convert the whole expression
+ // (could be OR, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+ predicates: &mut Vec<Predicate>,
+) {
+ // Handle short-circuit OR expression (SCOrExpr)
+ if let Some(sc_or) = expr.as_any().downcast_ref::<SCOrExpr>() {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(&sc_or.left, schema, predicates);
+ collect_or_predicates(&sc_or.right, schema, predicates);
+ return;
+ }
+
+ // Handle BinaryExpr with OR operator
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::Or) {
+ // Recursively collect OR sub-conditions from both sides
+ collect_or_predicates(binary.left(), schema, predicates);
+ collect_or_predicates(binary.right(), schema, predicates);
+ return;
+ }
+ }
+
+ // Not an OR expression, convert the whole expression
+ // (could be AND, comparison, IS NULL, etc.)
+ if let Some(pred) = convert_expr_to_orc(expr, schema) {
+ predicates.push(pred);
+ }
+}
+
+fn convert_expr_to_orc(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle top-level short-circuit AND expression (SCAndExpr)
+ if let Some(_sc_and) = expr.as_any().downcast_ref::<SCAndExpr>() {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level short-circuit OR expression (SCOrExpr)
+ if let Some(_sc_or) = expr.as_any().downcast_ref::<SCOrExpr>() {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+
+ // Handle top-level AND expression (BinaryExpr with AND operator)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ if matches!(binary.op(), Operator::And) {
+ let mut predicates = Vec::new();
+ collect_and_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::and(predicates));
+ }
+
+ // Handle top-level OR expression (BinaryExpr with OR operator)
+ if matches!(binary.op(), Operator::Or) {
+ let mut predicates = Vec::new();
+ collect_or_predicates(expr, schema, &mut predicates);
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ if predicates.len() == 1 {
+ return Some(predicates.into_iter().next().unwrap());
+ }
+
+ return Some(Predicate::or(predicates));
+ }
+ }
+
+ convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+ expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ schema: &SchemaRef,
+) -> Option<Predicate> {
+ // Handle Literal expressions (WHERE true, WHERE false, etc.)
+ if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+ match lit.value() {
+ ScalarValue::Boolean(Some(true)) => {
+ // WHERE true - no filtering needed, return None to skip
predicate
+ return None;
+ }
+ ScalarValue::Boolean(Some(false)) => {
+ // WHERE false - need to filter all data
+ // Create an impossible condition using a schema column if
available
+ // Use: column IS NULL AND column IS NOT NULL (always false)
+ if let Some(field) = schema.fields().first() {
+ let col_name = field.name().as_str();
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ // Fallback: no columns in schema, can't create a predicate
+ // using a synthetic column name to ensure all data is
filtered.
+ let col_name = "__orc_where_false_constant__";
+ return Some(Predicate::and(vec![
+ Predicate::is_null(col_name),
+ Predicate::not(Predicate::is_null(col_name)),
+ ]));
+ }
+ _ => {
+ return None;
+ }
+ }
+ }
+
+ // Handle NOT expressions (WHERE NOT condition)
+ if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+ if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+ return Some(Predicate::not(inner_pred));
+ }
+ return None;
+ }
+
+ // Handle IS NULL expressions
+ if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+ if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ return Some(Predicate::is_null(col_name));
+ }
+ return None;
+ }
+
+ // Handle IS NOT NULL expressions
+ if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+ if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>()
{
+ let col_name = col.name();
+ return Some(Predicate::not(Predicate::is_null(col_name)));
+ }
+ return None;
+ }
+
+ // Handle IN expressions (WHERE col IN (val1, val2, ...))
+ if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+ if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+
+ // Convert IN to multiple OR conditions: col = val1 OR col = val2
OR ...
+ let mut predicates = Vec::new();
+ for list_expr in in_list.list() {
+ if let Some(lit) =
list_expr.as_any().downcast_ref::<Literal>() {
+ if let Some(pred_value) =
convert_scalar_value(lit.value()) {
+ predicates.push(Predicate::eq(col_name, pred_value));
+ }
+ }
+ }
+
+ if predicates.is_empty() {
+ return None;
+ }
+
+ // If negated is true, it represents NOT IN
+ if in_list.negated() {
+ return Some(Predicate::not(Predicate::or(predicates)));
+ } else {
+ return Some(Predicate::or(predicates));
+ }
+ }
+ return None;
+ }
+
+ // Handle BinaryExpr (comparison operations)
+ if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+ let left = binary.left();
+ let right = binary.right();
+ let op = binary.op();
+
+ // AND/OR are already handled at the outer level, skip here
+ if matches!(op, Operator::And | Operator::Or) {
+ return None;
+ }
+
+ if let Some(col) = left.as_any().downcast_ref::<Column>() {
+ if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate(col_name, op, value);
+ }
+ }
+
+ if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+ if let Some(col) = right.as_any().downcast_ref::<Column>() {
+ let col_name = col.name();
+ let value = lit.value();
+ return build_comparison_predicate_reversed(col_name, op,
value);
+ }
+ }
+ }
+
+ None
+}
+
+fn build_comparison_predicate(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn build_comparison_predicate_reversed(
+ col_name: &str,
+ op: &Operator,
+ value: &ScalarValue,
+) -> Option<Predicate> {
+ let predicate_value = convert_scalar_value(value)?;
+
+ match op {
+ Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+ Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+ Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+ Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+ Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+ Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+ _ => None,
+ }
+}
+
+fn convert_scalar_value(value: &ScalarValue) -> Option<PredicateValue> {
+ match value {
+ ScalarValue::Boolean(v) => Some(PredicateValue::Boolean(*v)),
+ ScalarValue::Int8(v) => Some(PredicateValue::Int8(*v)),
+ ScalarValue::Int16(v) => Some(PredicateValue::Int16(*v)),
+ ScalarValue::Int32(v) => Some(PredicateValue::Int32(*v)),
+ ScalarValue::Int64(v) => Some(PredicateValue::Int64(*v)),
+ ScalarValue::Float32(v) => Some(PredicateValue::Float32(*v)),
+ ScalarValue::Float64(v) => Some(PredicateValue::Float64(*v)),
+ ScalarValue::Utf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
+ _ => None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::{
+ logical_expr::Operator,
+ physical_expr::expressions::{
+ BinaryExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Literal, NotExpr, SCAndExpr,
+ SCOrExpr,
+ },
+ scalar::ScalarValue,
+ };
+
+ use super::*;
+
+ fn create_test_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, true),
+ Field::new("age", DataType::Int32, true),
+ Field::new("score", DataType::Float64, true),
+ ]))
+ }
+
+ #[test]
+ fn test_literal_true() {
+ let schema = create_test_schema();
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // WHERE true should return None (no filtering)
+ assert!(result.is_none());
+ }
+
+ #[test]
+ fn test_literal_false() {
+ let schema = create_test_schema();
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // WHERE false should return a predicate that filters all data
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_comparison_eq() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Comparison { column: \"id\", op: Equal, value: Int32(Some(42)) }"
+ );
+ }
+
+ #[test]
+ fn test_comparison_ne() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let lit =
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::NotEq, lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Comparison { column: \"name\", op: NotEqual, value:
Utf8(Some(\"test\")) }"
+ );
+ }
+
+ #[test]
+ fn test_comparison_lt_gt_lte_gte() {
+ let schema = create_test_schema();
+
+ // Test LT
+ let col = Arc::new(Column::new("age", 2));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(30))));
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Lt,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test GT
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Gt,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test LtEq
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::LtEq,
lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test GtEq
+ let expr = Arc::new(BinaryExpr::new(col, Operator::GtEq, lit));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_comparison_reversed() {
+ let schema = create_test_schema();
+
+ // Test all reversed comparison operators
+ // Format: (operator, expected_debug_string_fragment)
+
+ // Symmetric operators (Eq, NotEq) - order doesn't change semantics
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let col = Arc::new(Column::new("id", 0));
+ let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::Eq,
col.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ // 42 = id → id = 42
+
+ let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::NotEq,
col.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ // 42 != id → id != 42
+
+ // Asymmetric operators - must be reversed
+
+ // Test: 42 < id → id > 42
+ let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::Lt,
col.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be GreaterThan, not LessThan
+ assert!(
+ debug_str.contains("GreaterThan") || debug_str.contains("Gt"),
+ "Expected GreaterThan for reversed Lt, got: {}",
+ debug_str
+ );
+
+ // Test: 42 <= id → id >= 42
+ let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::LtEq,
col.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ assert!(
+ debug_str.contains("GreaterThanOrEqual")
+ || debug_str.contains("GreaterThanEquals")
+ || debug_str.contains("Gte"),
+ "Expected GreaterThanOrEqual for reversed LtEq, got: {}",
+ debug_str
+ );
+
+ // Test: 42 > id → id < 42
+ let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::Gt,
col.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ assert!(
+ debug_str.contains("LessThan") || debug_str.contains("Lt"),
+ "Expected LessThan for reversed Gt, got: {}",
+ debug_str
+ );
+
+ // Test: 42 >= id → id <= 42
+ let expr = Arc::new(BinaryExpr::new(lit, Operator::GtEq, col));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ assert!(
+ debug_str.contains("LessThanOrEqual")
+ || debug_str.contains("LessThanEquals")
+ || debug_str.contains("Lte"),
+ "Expected LessThanOrEqual for reversed GtEq, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_is_null() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let expr = Arc::new(IsNullExpr::new(col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(format!("{:?}", predicate), "IsNull { column: \"name\" }");
+ }
+
+ #[test]
+ fn test_is_not_null() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("age", 2));
+ let expr = Arc::new(IsNotNullExpr::new(col));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert_eq!(
+ format!("{:?}", predicate),
+ "Not(IsNull { column: \"age\" })"
+ );
+ }
+
+ #[test]
+ fn test_not_expr() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let eq_expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ let not_expr = Arc::new(NotExpr::new(eq_expr));
+
+ let result = convert_predicate_to_orc(Some(not_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("Not(Comparison"));
+ }
+
+ #[test]
+ fn test_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(2))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(3))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, false, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ // IN list should be converted to OR of equality predicates
+ assert!(format!("{:?}", predicate).starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_not_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, true, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ // NOT IN should be converted to NOT(OR(...))
+ assert!(format!("{:?}", predicate).starts_with("Not(Or(["));
+ }
+
+ #[test]
+ fn test_and_simple() {
+ let schema = create_test_schema();
+ // id = 42 AND age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let and_expr = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("And(["));
+ }
+
+ #[test]
+ fn test_and_nested_flattening() {
+ let schema = create_test_schema();
+ // ((id = 1 AND age = 2) AND name = "foo") should be flattened to
And([...])
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+ let result = convert_predicate_to_orc(Some(and2), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([cond1, cond2, cond3])
+ assert!(debug_str.starts_with("And(["));
+ // Count the number of conditions (should be 3)
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 3);
+ }
+
+ #[test]
+ fn test_or_simple() {
+ let schema = create_test_schema();
+ // id = 1 OR id = 2
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let result = convert_predicate_to_orc(Some(or_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ assert!(format!("{:?}", predicate).starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_or_nested_flattening() {
+ let schema = create_test_schema();
+ // ((id = 1 OR age = 2) OR score = 3.0) should be flattened
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let or1 = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let col3 = Arc::new(Column::new("score", 3));
+ let lit3 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let or2 = Arc::new(BinaryExpr::new(or1, Operator::Or, expr3));
+
+ let result = convert_predicate_to_orc(Some(or2), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to Or([cond1, cond2, cond3])
+ assert!(debug_str.starts_with("Or(["));
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 3);
+ }
+
+ #[test]
+ fn test_complex_mixed_predicates() {
+ let schema = create_test_schema();
+ // (id = 1 OR id = 2) AND name IS NOT NULL
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let col2 = Arc::new(Column::new("name", 1));
+ let is_not_null = Arc::new(IsNotNullExpr::new(col2));
+
+ let and_expr = Arc::new(BinaryExpr::new(or_expr, Operator::And,
is_not_null));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should have And at top level
+ assert!(
+ debug_str.contains("And"),
+ "Expected And, got: {}",
+ debug_str
+ );
+ // Should contain OR for the id conditions
+ assert!(debug_str.contains("Or"), "Expected Or, got: {}", debug_str);
+ // Should contain the IS NOT NULL condition
+ assert!(
+ debug_str.contains("IsNull"),
+ "Expected IsNull, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_deeply_nested_and() {
+ let schema = create_test_schema();
+ // Build: (((id = 1 AND age = 2) AND name = "foo") AND score = 3.0)
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+ let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+ let col4 = Arc::new(Column::new("score", 3));
+ let lit4 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+ let expr4 = Arc::new(BinaryExpr::new(col4, Operator::Eq, lit4));
+
+ let and3 = Arc::new(BinaryExpr::new(and2, Operator::And, expr4));
+
+ let result = convert_predicate_to_orc(Some(and3), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([cond1, cond2, cond3, cond4])
+ assert!(debug_str.starts_with("And(["));
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(condition_count, 4);
+ }
+
+ #[test]
+ fn test_all_scalar_types() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("col_bool", DataType::Boolean, true),
+ Field::new("col_i8", DataType::Int8, true),
+ Field::new("col_i16", DataType::Int16, true),
+ Field::new("col_i32", DataType::Int32, true),
+ Field::new("col_i64", DataType::Int64, true),
+ Field::new("col_f32", DataType::Float32, true),
+ Field::new("col_f64", DataType::Float64, true),
+ Field::new("col_utf8", DataType::Utf8, true),
+ ]));
+
+ // Test Boolean
+ let col = Arc::new(Column::new("col_bool", 0));
+ let lit = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int8
+ let col = Arc::new(Column::new("col_i8", 1));
+ let lit = Arc::new(Literal::new(ScalarValue::Int8(Some(42))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int16
+ let col = Arc::new(Column::new("col_i16", 2));
+ let lit = Arc::new(Literal::new(ScalarValue::Int16(Some(1000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int32
+ let col = Arc::new(Column::new("col_i32", 3));
+ let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(100000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Int64
+ let col = Arc::new(Column::new("col_i64", 4));
+ let lit = Arc::new(Literal::new(ScalarValue::Int64(Some(1000000000))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Float32
+ let col = Arc::new(Column::new("col_f32", 5));
+ let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(3.14))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Float64
+ let col = Arc::new(Column::new("col_f64", 6));
+ let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(2.718))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+ // Test Utf8
+ let col = Arc::new(Column::new("col_utf8", 7));
+ let lit =
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+ assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+ }
+
+ #[test]
+ fn test_null_literal_in_comparison() {
+ let schema = create_test_schema();
+
+ // Test: WHERE id = NULL
+ // Note: In SQL semantics, "col = NULL" always evaluates to NULL (not
true or
+ // false) However, we should still handle it gracefully
+ let col = Arc::new(Column::new("id", 0));
+ let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Eq,
null_lit.clone()));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // Should convert to an ORC predicate (even though semantically it
won't match
+ // anything)
+ assert!(result.is_some());
+
+ // Test: WHERE id != NULL
+ let expr = Arc::new(BinaryExpr::new(
+ col.clone(),
+ Operator::NotEq,
+ null_lit.clone(),
+ ));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test: WHERE id < NULL
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Lt,
null_lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test: WHERE id > NULL
+ let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Gt,
null_lit.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test: WHERE NULL = id (reversed)
+ let expr = Arc::new(BinaryExpr::new(null_lit.clone(), Operator::Eq,
col.clone()));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test: WHERE NULL < id (reversed)
+ let expr = Arc::new(BinaryExpr::new(null_lit, Operator::Lt, col));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_null_in_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+
+ // Test: WHERE id IN (1, NULL, 3)
+ // This should handle NULL gracefully
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Int32(Some(1))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(None)))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(Some(3))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col.clone(), values, false, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be converted to OR of equality predicates
+ // NULL values should be included (even though they won't match)
+ assert!(debug_str.starts_with("Or(["));
+ }
+
+ #[test]
+ fn test_null_in_not_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("name", 1));
+
+ // Test: WHERE name NOT IN ('foo', NULL, 'bar')
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Utf8(None)))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_string()))))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, true, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be NOT(OR(...))
+ assert!(debug_str.starts_with("Not(Or(["));
+ }
+
+ #[test]
+ fn test_all_null_in_list() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+
+ // Test: WHERE id IN (NULL, NULL, NULL)
+ // Edge case: all values are NULL
+ let values = vec![
+ Arc::new(Literal::new(ScalarValue::Int32(None)))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(None)))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ Arc::new(Literal::new(ScalarValue::Int32(None)))
+ as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+ ];
+ let expr = Arc::new(InListExpr::new(col, values, false, None));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ // Should still produce a predicate (though it won't match anything)
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_null_with_and_predicate() {
+ let schema = create_test_schema();
+
+ // Test: WHERE id = NULL AND age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, null_lit));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let and_expr = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+ let result = convert_predicate_to_orc(Some(and_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([...])
+ assert!(debug_str.contains("And"));
+ }
+
+ #[test]
+ fn test_null_with_or_predicate() {
+ let schema = create_test_schema();
+
+ // Test: WHERE id = NULL OR age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, null_lit));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+ let result = convert_predicate_to_orc(Some(or_expr), &schema);
+ assert!(result.is_some());
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to Or([...])
+ assert!(debug_str.contains("Or"));
+ }
+
+ #[test]
+ fn test_various_null_types() {
+ let schema = create_test_schema();
+
+ // Test NULL with different data types
+ let test_cases = vec![
+ ("id", ScalarValue::Int32(None)),
+ ("name", ScalarValue::Utf8(None)),
+ ("age", ScalarValue::Int32(None)),
+ ("score", ScalarValue::Float64(None)),
+ ];
+
+ for (col_name, null_value) in test_cases {
+ let col = Arc::new(Column::new(col_name,
schema.index_of(col_name).unwrap()));
+ let null_lit = Arc::new(Literal::new(null_value));
+ let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, null_lit));
+
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(
+ result.is_some(),
+ "Failed to convert NULL comparison for column: {}",
+ col_name
+ );
+ }
+ }
+
+ #[test]
+ fn test_null_literal_edge_cases() {
+ let schema = create_test_schema();
+ let col = Arc::new(Column::new("id", 0));
+
+ // Test: WHERE id >= NULL
+ let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+ let expr = Arc::new(BinaryExpr::new(
+ col.clone(),
+ Operator::GtEq,
+ null_lit.clone(),
+ ));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+
+ // Test: WHERE id <= NULL
+ let expr = Arc::new(BinaryExpr::new(col, Operator::LtEq, null_lit));
+ let result = convert_predicate_to_orc(Some(expr), &schema);
+ assert!(result.is_some());
+ }
+
+ #[test]
+ fn test_where_false_with_empty_schema() {
+ // Edge case: WHERE false with an empty schema (no columns)
+ let empty_schema = Arc::new(Schema::empty());
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
+
+ let result = convert_predicate_to_orc(Some(expr), &empty_schema);
+
+ // Should still produce a predicate using a synthetic column name
+ // to ensure all data is filtered
+ assert!(
+ result.is_some(),
+ "WHERE false should produce a predicate even with empty schema"
+ );
+
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+
+ // Should use the synthetic column name
+ assert!(
+ debug_str.contains("__orc_where_false_constant__") ||
debug_str.contains("And"),
+ "Expected synthetic column or And predicate, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_where_true_with_empty_schema() {
+ // Edge case: WHERE true with an empty schema
+ let empty_schema = Arc::new(Schema::empty());
+ let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+
+ let result = convert_predicate_to_orc(Some(expr), &empty_schema);
+
+ // WHERE true should always return None (no filtering)
+ assert!(
+ result.is_none(),
+ "WHERE true should not produce any predicate"
+ );
+ }
+
+ #[test]
+ fn test_short_circuit_and_expr() {
+ let schema = create_test_schema();
+
+ // Test: SCAndExpr (short-circuit AND)
+ // Simulating: WHERE id = 42 AND age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let sc_and_expr = Arc::new(SCAndExpr::new(expr1, expr2));
+
+ let result = convert_predicate_to_orc(Some(sc_and_expr), &schema);
+ assert!(result.is_some(), "SCAndExpr should convert to predicate");
+
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to And([...])
+ assert!(
+ debug_str.contains("And"),
+ "Expected And predicate, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_short_circuit_or_expr() {
+ let schema = create_test_schema();
+
+ // Test: SCOrExpr (short-circuit OR)
+ // Simulating: WHERE id = 1 OR id = 2
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let sc_or_expr = Arc::new(SCOrExpr::new(expr1, expr2));
+
+ let result = convert_predicate_to_orc(Some(sc_or_expr), &schema);
+ assert!(result.is_some(), "SCOrExpr should convert to predicate");
+
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should be flattened to Or([...])
+ assert!(
+ debug_str.contains("Or"),
+ "Expected Or predicate, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_nested_short_circuit_exprs() {
+ let schema = create_test_schema();
+
+ // Test: Nested SCAndExpr and SCOrExpr
+ // Simulating: WHERE (id = 1 OR id = 2) AND age > 18
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+ let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq,
lit1));
+
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+ let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+ let sc_or_expr = Arc::new(SCOrExpr::new(expr1, expr2));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit3 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr3 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit3));
+
+ let sc_and_expr = Arc::new(SCAndExpr::new(sc_or_expr, expr3));
+
+ let result = convert_predicate_to_orc(Some(sc_and_expr), &schema);
+ assert!(
+ result.is_some(),
+ "Nested SCAndExpr/SCOrExpr should convert to predicate"
+ );
+
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should have And at top level with Or inside
+ assert!(
+ debug_str.contains("And"),
+ "Expected And at top level, got: {}",
+ debug_str
+ );
+ assert!(
+ debug_str.contains("Or"),
+ "Expected Or inside And, got: {}",
+ debug_str
+ );
+ }
+
+ #[test]
+ fn test_mixed_binary_and_short_circuit() {
+ let schema = create_test_schema();
+
+ // Test: Mix of BinaryExpr (AND) and SCAndExpr
+ // Simulating: WHERE id = 42 AND (age > 18 AND status = 'ACTIVE')
+ let col1 = Arc::new(Column::new("id", 0));
+ let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+ let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+ let col2 = Arc::new(Column::new("age", 2));
+ let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+ let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+ let col3 = Arc::new(Column::new("name", 1));
+ let lit3 =
Arc::new(Literal::new(ScalarValue::Utf8(Some("ACTIVE".to_string()))));
+ let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+ let sc_and_inner = Arc::new(SCAndExpr::new(expr2, expr3));
+ let binary_and_outer = Arc::new(BinaryExpr::new(expr1, Operator::And,
sc_and_inner));
+
+ let result = convert_predicate_to_orc(Some(binary_and_outer), &schema);
+ assert!(
+ result.is_some(),
+ "Mixed BinaryExpr/SCAndExpr should convert to predicate"
+ );
+
+ let predicate = result.unwrap();
+ let debug_str = format!("{:?}", predicate);
+ // Should all be flattened to And([...])
+ assert!(
+ debug_str.contains("And"),
+ "Expected And predicate, got: {}",
+ debug_str
+ );
+ // Should have 3 conditions flattened
+ let condition_count = debug_str.matches("Comparison").count();
+ assert_eq!(
+ condition_count, 3,
+ "Expected 3 comparison conditions, got: {}",
+ condition_count
+ );
+ }
+}