This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb19933 [AURON #1624] Support the pushdown of ORC predicates (#1886)
0bb19933 is described below

commit 0bb19933fa6eae2cca95af27ea91f2d5be339c3f
Author: Graceful <[email protected]>
AuthorDate: Fri Jan 16 16:32:17 2026 +0800

    [AURON #1624] Support the pushdown of ORC predicates (#1886)
    
    # Which issue does this PR close?
    
    Closes #1624
    
    # Rationale for this change
    Support the push-down of ORC predicates to enhance the reading
    efficiency of ORC files
    # What changes are included in this PR?
    
    # Are there any user-facing changes?
    no
    # How was this patch tested?
    cluster test
    
    ---------
    
    Co-authored-by: duanhao-jk <[email protected]>
---
 Cargo.lock                                         |    3 +-
 Cargo.toml                                         |    2 +-
 native-engine/datafusion-ext-plans/src/orc_exec.rs | 1247 +++++++++++++++++++-
 3 files changed, 1240 insertions(+), 12 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 01266312..816d296f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2794,7 +2794,7 @@ checksum = 
"42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
 [[package]]
 name = "orc-rust"
 version = "0.7.0"
-source = 
"git+https://github.com/auron-project/datafusion-orc.git?rev=919e050#919e0509b2b4b5a3bb70eea4b34633d3a3c933e8";
+source = 
"git+https://github.com/auron-project/datafusion-orc.git?rev=17f7012#17f7012bbd82cbe9967d99fb49aa0069306537fd";
 dependencies = [
  "arrow",
  "async-trait",
@@ -2807,6 +2807,7 @@ dependencies = [
  "futures",
  "futures-util",
  "iana-time-zone",
+ "log",
  "lz4_flex 0.11.5",
  "lzokay-native",
  "num",
diff --git a/Cargo.toml b/Cargo.toml
index 47470c13..e751b3a8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -121,7 +121,7 @@ datafusion-execution = { git = 
"https://github.com/auron-project/datafusion.git";
 datafusion-optimizer = { git = 
"https://github.com/auron-project/datafusion.git";, rev = "9034aeffb"}
 datafusion-physical-expr = { git = 
"https://github.com/auron-project/datafusion.git";, rev = "9034aeffb"}
 datafusion-spark = { git = "https://github.com/auron-project/datafusion.git";, 
rev = "9034aeffb"}
-orc-rust = { git = "https://github.com/auron-project/datafusion-orc.git";, rev 
= "919e050"}
+orc-rust = { git = "https://github.com/auron-project/datafusion-orc.git";, rev 
= "17f7012"}
 
 # arrow: branch=v55.2.0-blaze
 arrow = { git = "https://github.com/auron-project/arrow-rs.git";, rev = 
"5de02520c"}
diff --git a/native-engine/datafusion-ext-plans/src/orc_exec.rs 
b/native-engine/datafusion-ext-plans/src/orc_exec.rs
index c53cb6b5..965b53e9 100644
--- a/native-engine/datafusion-ext-plans/src/orc_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/orc_exec.rs
@@ -29,13 +29,21 @@ use datafusion::{
     },
     error::Result,
     execution::context::TaskContext,
-    physical_expr::{EquivalenceProperties, PhysicalExprRef},
+    logical_expr::Operator,
+    physical_expr::{
+        EquivalenceProperties, PhysicalExprRef,
+        expressions::{
+            BinaryExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, 
Literal, NotExpr, SCAndExpr,
+            SCOrExpr,
+        },
+    },
     physical_plan::{
         DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties,
         SendableRecordBatchStream, Statistics,
         execution_plan::{Boundedness, EmissionType},
         metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
     },
+    scalar::ScalarValue,
 };
 use datafusion_datasource::PartitionedFile;
 use datafusion_ext_commons::{batch_size, df_execution_err, 
hadoop_fs::FsProvider};
@@ -45,6 +53,7 @@ use once_cell::sync::OnceCell;
 use orc_rust::{
     TimestampPrecision,
     arrow_reader::ArrowReaderBuilder,
+    predicate::{Predicate, PredicateValue},
     projection::ProjectionMask,
     reader::{AsyncChunkReader, metadata::FileMetadata},
 };
@@ -62,7 +71,7 @@ pub struct OrcExec {
     projected_statistics: Statistics,
     projected_schema: SchemaRef,
     metrics: ExecutionPlanMetricsSet,
-    _predicate: Option<PhysicalExprRef>,
+    predicate: Option<PhysicalExprRef>,
     props: OnceCell<PlanProperties>,
 }
 
@@ -72,7 +81,7 @@ impl OrcExec {
     pub fn new(
         base_config: FileScanConfig,
         fs_resource_id: String,
-        _predicate: Option<PhysicalExprRef>,
+        predicate: Option<PhysicalExprRef>,
     ) -> Self {
         let metrics = ExecutionPlanMetricsSet::new();
 
@@ -85,7 +94,7 @@ impl OrcExec {
             projected_statistics,
             projected_schema,
             metrics,
-            _predicate,
+            predicate,
             props: OnceCell::new(),
         }
     }
@@ -96,11 +105,12 @@ impl DisplayAs for OrcExec {
         let limit = self.base_config.limit;
         let projection = self.base_config.projection.clone();
         let file_group = &self.base_config.file_groups;
+        let pred = &self.predicate;
 
         write!(
             f,
-            "OrcExec: file_group={:?}, limit={:?}, projection={:?}",
-            file_group, limit, projection
+            "OrcExec: file_group={:?}, limit={:?}, projection={:?}, 
predicate={:?}",
+            file_group, limit, projection, pred
         )
     }
 }
@@ -172,6 +182,7 @@ impl ExecutionPlan for OrcExec {
             force_positional_evolution,
             use_microsecond_precision,
             is_case_sensitive,
+            predicate: self.predicate.clone(),
         });
 
         let file_stream = Box::pin(FileStream::new(
@@ -220,6 +231,7 @@ struct OrcOpener {
     force_positional_evolution: bool,
     use_microsecond_precision: bool,
     is_case_sensitive: bool,
+    predicate: Option<PhysicalExprRef>,
 }
 
 impl FileOpener for OrcOpener {
@@ -244,11 +256,12 @@ impl FileOpener for OrcOpener {
         let projected_schema = 
SchemaRef::from(self.table_schema.project(&projection)?);
         let schema_adapter = SchemaAdapter::new(
             self.table_schema.clone(),
-            projected_schema,
+            projected_schema.clone(),
             self.force_positional_evolution,
         );
         let use_microsecond = self.use_microsecond_precision;
         let is_case = self.is_case_sensitive;
+        let predicate = self.predicate.clone();
 
         Ok(Box::pin(async move {
             let mut builder = ArrowReaderBuilder::try_new_async(reader)
@@ -267,10 +280,14 @@ impl FileOpener for OrcOpener {
 
             let projection_mask =
                 
ProjectionMask::roots(builder.file_metadata().root_data_type(), projection);
-            let stream = builder
+            builder = builder
                 .with_batch_size(batch_size)
-                .with_projection(projection_mask)
-                .build_async();
+                .with_projection(projection_mask);
+            if let Some(orc_predicate) = convert_predicate_to_orc(predicate, 
&projected_schema) {
+                builder = builder.with_predicate(orc_predicate);
+            }
+
+            let stream = builder.build_async();
 
             let adapted = stream
                 .map_err(|e| ArrowError::ExternalError(Box::new(e)))
@@ -414,3 +431,1213 @@ impl OrcFileMetrics {
         Self { bytes_scanned }
     }
 }
+
+fn convert_predicate_to_orc(
+    predicate: Option<PhysicalExprRef>,
+    file_schema: &SchemaRef,
+) -> Option<Predicate> {
+    let predicate = predicate?;
+    convert_expr_to_orc(&predicate, file_schema)
+}
+
+/// Recursively collect all AND sub-conditions and flatten nested AND
+/// structures.
+fn collect_and_predicates(
+    expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+    schema: &SchemaRef,
+    predicates: &mut Vec<Predicate>,
+) {
+    // Handle short-circuit AND expression (SCAndExpr)
+    if let Some(sc_and) = expr.as_any().downcast_ref::<SCAndExpr>() {
+        // Recursively collect AND sub-conditions from both sides
+        collect_and_predicates(&sc_and.left, schema, predicates);
+        collect_and_predicates(&sc_and.right, schema, predicates);
+        return;
+    }
+
+    // Handle BinaryExpr with AND operator
+    if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+        if matches!(binary.op(), Operator::And) {
+            // Recursively collect AND sub-conditions from both sides
+            collect_and_predicates(binary.left(), schema, predicates);
+            collect_and_predicates(binary.right(), schema, predicates);
+            return;
+        }
+    }
+
+    // Not an AND expression, convert the whole expression
+    // (could be OR, comparison, IS NULL, etc.)
+    if let Some(pred) = convert_expr_to_orc(expr, schema) {
+        predicates.push(pred);
+    }
+}
+
+/// Recursively collect all OR sub-conditions and flatten nested OR
+/// structures.
+fn collect_or_predicates(
+    expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+    schema: &SchemaRef,
+    predicates: &mut Vec<Predicate>,
+) {
+    // Handle short-circuit OR expression (SCOrExpr)
+    if let Some(sc_or) = expr.as_any().downcast_ref::<SCOrExpr>() {
+        // Recursively collect OR sub-conditions from both sides
+        collect_or_predicates(&sc_or.left, schema, predicates);
+        collect_or_predicates(&sc_or.right, schema, predicates);
+        return;
+    }
+
+    // Handle BinaryExpr with OR operator
+    if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+        if matches!(binary.op(), Operator::Or) {
+            // Recursively collect OR sub-conditions from both sides
+            collect_or_predicates(binary.left(), schema, predicates);
+            collect_or_predicates(binary.right(), schema, predicates);
+            return;
+        }
+    }
+
+    // Not an OR expression, convert the whole expression
+    // (could be AND, comparison, IS NULL, etc.)
+    if let Some(pred) = convert_expr_to_orc(expr, schema) {
+        predicates.push(pred);
+    }
+}
+
+fn convert_expr_to_orc(
+    expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+    schema: &SchemaRef,
+) -> Option<Predicate> {
+    // Handle top-level short-circuit AND expression (SCAndExpr)
+    if let Some(_sc_and) = expr.as_any().downcast_ref::<SCAndExpr>() {
+        let mut predicates = Vec::new();
+        collect_and_predicates(expr, schema, &mut predicates);
+
+        if predicates.is_empty() {
+            return None;
+        }
+
+        if predicates.len() == 1 {
+            return Some(predicates.into_iter().next().unwrap());
+        }
+
+        return Some(Predicate::and(predicates));
+    }
+
+    // Handle top-level short-circuit OR expression (SCOrExpr)
+    if let Some(_sc_or) = expr.as_any().downcast_ref::<SCOrExpr>() {
+        let mut predicates = Vec::new();
+        collect_or_predicates(expr, schema, &mut predicates);
+
+        if predicates.is_empty() {
+            return None;
+        }
+
+        if predicates.len() == 1 {
+            return Some(predicates.into_iter().next().unwrap());
+        }
+
+        return Some(Predicate::or(predicates));
+    }
+
+    // Handle top-level AND expression (BinaryExpr with AND operator)
+    if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+        if matches!(binary.op(), Operator::And) {
+            let mut predicates = Vec::new();
+            collect_and_predicates(expr, schema, &mut predicates);
+
+            if predicates.is_empty() {
+                return None;
+            }
+
+            if predicates.len() == 1 {
+                return Some(predicates.into_iter().next().unwrap());
+            }
+
+            return Some(Predicate::and(predicates));
+        }
+
+        // Handle top-level OR expression (BinaryExpr with OR operator)
+        if matches!(binary.op(), Operator::Or) {
+            let mut predicates = Vec::new();
+            collect_or_predicates(expr, schema, &mut predicates);
+
+            if predicates.is_empty() {
+                return None;
+            }
+
+            if predicates.len() == 1 {
+                return Some(predicates.into_iter().next().unwrap());
+            }
+
+            return Some(Predicate::or(predicates));
+        }
+    }
+
+    convert_expr_to_orc_internal(expr, schema)
+}
+
+/// Internal conversion function for non-AND/OR expressions.
+fn convert_expr_to_orc_internal(
+    expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+    schema: &SchemaRef,
+) -> Option<Predicate> {
+    // Handle Literal expressions (WHERE true, WHERE false, etc.)
+    if let Some(lit) = expr.as_any().downcast_ref::<Literal>() {
+        match lit.value() {
+            ScalarValue::Boolean(Some(true)) => {
+                // WHERE true - no filtering needed, return None to skip 
predicate
+                return None;
+            }
+            ScalarValue::Boolean(Some(false)) => {
+                // WHERE false - need to filter all data
+                // Create an impossible condition using a schema column if 
available
+                // Use: column IS NULL AND column IS NOT NULL (always false)
+                if let Some(field) = schema.fields().first() {
+                    let col_name = field.name().as_str();
+                    return Some(Predicate::and(vec![
+                        Predicate::is_null(col_name),
+                        Predicate::not(Predicate::is_null(col_name)),
+                    ]));
+                }
+                // Fallback: no columns in schema, can't create a predicate
+                // using a synthetic column name to ensure all data is 
filtered.
+                let col_name = "__orc_where_false_constant__";
+                return Some(Predicate::and(vec![
+                    Predicate::is_null(col_name),
+                    Predicate::not(Predicate::is_null(col_name)),
+                ]));
+            }
+            _ => {
+                return None;
+            }
+        }
+    }
+
+    // Handle NOT expressions (WHERE NOT condition)
+    if let Some(not_expr) = expr.as_any().downcast_ref::<NotExpr>() {
+        if let Some(inner_pred) = convert_expr_to_orc(not_expr.arg(), schema) {
+            return Some(Predicate::not(inner_pred));
+        }
+        return None;
+    }
+
+    // Handle IS NULL expressions
+    if let Some(is_null) = expr.as_any().downcast_ref::<IsNullExpr>() {
+        if let Some(col) = is_null.arg().as_any().downcast_ref::<Column>() {
+            let col_name = col.name();
+            return Some(Predicate::is_null(col_name));
+        }
+        return None;
+    }
+
+    // Handle IS NOT NULL expressions
+    if let Some(is_not_null) = expr.as_any().downcast_ref::<IsNotNullExpr>() {
+        if let Some(col) = is_not_null.arg().as_any().downcast_ref::<Column>() 
{
+            let col_name = col.name();
+            return Some(Predicate::not(Predicate::is_null(col_name)));
+        }
+        return None;
+    }
+
+    // Handle IN expressions (WHERE col IN (val1, val2, ...))
+    if let Some(in_list) = expr.as_any().downcast_ref::<InListExpr>() {
+        if let Some(col) = in_list.expr().as_any().downcast_ref::<Column>() {
+            let col_name = col.name();
+
+            // Convert IN to multiple OR conditions: col = val1 OR col = val2 
OR ...
+            let mut predicates = Vec::new();
+            for list_expr in in_list.list() {
+                if let Some(lit) = 
list_expr.as_any().downcast_ref::<Literal>() {
+                    if let Some(pred_value) = 
convert_scalar_value(lit.value()) {
+                        predicates.push(Predicate::eq(col_name, pred_value));
+                    }
+                }
+            }
+
+            if predicates.is_empty() {
+                return None;
+            }
+
+            // If negated is true, it represents NOT IN
+            if in_list.negated() {
+                return Some(Predicate::not(Predicate::or(predicates)));
+            } else {
+                return Some(Predicate::or(predicates));
+            }
+        }
+        return None;
+    }
+
+    // Handle BinaryExpr (comparison operations)
+    if let Some(binary) = expr.as_any().downcast_ref::<BinaryExpr>() {
+        let left = binary.left();
+        let right = binary.right();
+        let op = binary.op();
+
+        // AND/OR are already handled at the outer level, skip here
+        if matches!(op, Operator::And | Operator::Or) {
+            return None;
+        }
+
+        if let Some(col) = left.as_any().downcast_ref::<Column>() {
+            if let Some(lit) = right.as_any().downcast_ref::<Literal>() {
+                let col_name = col.name();
+                let value = lit.value();
+                return build_comparison_predicate(col_name, op, value);
+            }
+        }
+
+        if let Some(lit) = left.as_any().downcast_ref::<Literal>() {
+            if let Some(col) = right.as_any().downcast_ref::<Column>() {
+                let col_name = col.name();
+                let value = lit.value();
+                return build_comparison_predicate_reversed(col_name, op, 
value);
+            }
+        }
+    }
+
+    None
+}
+
+fn build_comparison_predicate(
+    col_name: &str,
+    op: &Operator,
+    value: &ScalarValue,
+) -> Option<Predicate> {
+    let predicate_value = convert_scalar_value(value)?;
+
+    match op {
+        Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+        Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+        Operator::Lt => Some(Predicate::lt(col_name, predicate_value)),
+        Operator::LtEq => Some(Predicate::lte(col_name, predicate_value)),
+        Operator::Gt => Some(Predicate::gt(col_name, predicate_value)),
+        Operator::GtEq => Some(Predicate::gte(col_name, predicate_value)),
+        _ => None,
+    }
+}
+
+fn build_comparison_predicate_reversed(
+    col_name: &str,
+    op: &Operator,
+    value: &ScalarValue,
+) -> Option<Predicate> {
+    let predicate_value = convert_scalar_value(value)?;
+
+    match op {
+        Operator::Eq => Some(Predicate::eq(col_name, predicate_value)),
+        Operator::NotEq => Some(Predicate::ne(col_name, predicate_value)),
+        Operator::Lt => Some(Predicate::gt(col_name, predicate_value)),
+        Operator::LtEq => Some(Predicate::gte(col_name, predicate_value)),
+        Operator::Gt => Some(Predicate::lt(col_name, predicate_value)),
+        Operator::GtEq => Some(Predicate::lte(col_name, predicate_value)),
+        _ => None,
+    }
+}
+
+fn convert_scalar_value(value: &ScalarValue) -> Option<PredicateValue> {
+    match value {
+        ScalarValue::Boolean(v) => Some(PredicateValue::Boolean(*v)),
+        ScalarValue::Int8(v) => Some(PredicateValue::Int8(*v)),
+        ScalarValue::Int16(v) => Some(PredicateValue::Int16(*v)),
+        ScalarValue::Int32(v) => Some(PredicateValue::Int32(*v)),
+        ScalarValue::Int64(v) => Some(PredicateValue::Int64(*v)),
+        ScalarValue::Float32(v) => Some(PredicateValue::Float32(*v)),
+        ScalarValue::Float64(v) => Some(PredicateValue::Float64(*v)),
+        ScalarValue::Utf8(v) => Some(PredicateValue::Utf8(v.clone())),
+        ScalarValue::LargeUtf8(v) => Some(PredicateValue::Utf8(v.clone())),
+        _ => None,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::{
+        logical_expr::Operator,
+        physical_expr::expressions::{
+            BinaryExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, 
Literal, NotExpr, SCAndExpr,
+            SCOrExpr,
+        },
+        scalar::ScalarValue,
+    };
+
+    use super::*;
+
+    fn create_test_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+            Field::new("age", DataType::Int32, true),
+            Field::new("score", DataType::Float64, true),
+        ]))
+    }
+
+    #[test]
+    fn test_literal_true() {
+        let schema = create_test_schema();
+        let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        // WHERE true should return None (no filtering)
+        assert!(result.is_none());
+    }
+
+    #[test]
+    fn test_literal_false() {
+        let schema = create_test_schema();
+        let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        // WHERE false should return a predicate that filters all data
+        assert!(result.is_some());
+    }
+
+    #[test]
+    fn test_comparison_eq() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("id", 0));
+        let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert_eq!(
+            format!("{:?}", predicate),
+            "Comparison { column: \"id\", op: Equal, value: Int32(Some(42)) }"
+        );
+    }
+
+    #[test]
+    fn test_comparison_ne() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("name", 1));
+        let lit = 
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::NotEq, lit));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert_eq!(
+            format!("{:?}", predicate),
+            "Comparison { column: \"name\", op: NotEqual, value: 
Utf8(Some(\"test\")) }"
+        );
+    }
+
+    #[test]
+    fn test_comparison_lt_gt_lte_gte() {
+        let schema = create_test_schema();
+
+        // Test LT
+        let col = Arc::new(Column::new("age", 2));
+        let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(30))));
+        let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Lt, 
lit.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test GT
+        let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Gt, 
lit.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test LtEq
+        let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::LtEq, 
lit.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test GtEq
+        let expr = Arc::new(BinaryExpr::new(col, Operator::GtEq, lit));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+    }
+
+    #[test]
+    fn test_comparison_reversed() {
+        let schema = create_test_schema();
+
+        // Test all reversed comparison operators
+        // Format: (operator, expected_debug_string_fragment)
+
+        // Symmetric operators (Eq, NotEq) - order doesn't change semantics
+        let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+        let col = Arc::new(Column::new("id", 0));
+        let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::Eq, 
col.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        // 42 = id → id = 42
+
+        let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::NotEq, 
col.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        // 42 != id → id != 42
+
+        // Asymmetric operators - must be reversed
+
+        // Test: 42 < id → id > 42
+        let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::Lt, 
col.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be GreaterThan, not LessThan
+        assert!(
+            debug_str.contains("GreaterThan") || debug_str.contains("Gt"),
+            "Expected GreaterThan for reversed Lt, got: {}",
+            debug_str
+        );
+
+        // Test: 42 <= id → id >= 42
+        let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::LtEq, 
col.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        assert!(
+            debug_str.contains("GreaterThanOrEqual")
+                || debug_str.contains("GreaterThanEquals")
+                || debug_str.contains("Gte"),
+            "Expected GreaterThanOrEqual for reversed LtEq, got: {}",
+            debug_str
+        );
+
+        // Test: 42 > id → id < 42
+        let expr = Arc::new(BinaryExpr::new(lit.clone(), Operator::Gt, 
col.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        assert!(
+            debug_str.contains("LessThan") || debug_str.contains("Lt"),
+            "Expected LessThan for reversed Gt, got: {}",
+            debug_str
+        );
+
+        // Test: 42 >= id → id <= 42
+        let expr = Arc::new(BinaryExpr::new(lit, Operator::GtEq, col));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        assert!(
+            debug_str.contains("LessThanOrEqual")
+                || debug_str.contains("LessThanEquals")
+                || debug_str.contains("Lte"),
+            "Expected LessThanOrEqual for reversed GtEq, got: {}",
+            debug_str
+        );
+    }
+
+    #[test]
+    fn test_is_null() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("name", 1));
+        let expr = Arc::new(IsNullExpr::new(col));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert_eq!(format!("{:?}", predicate), "IsNull { column: \"name\" }");
+    }
+
+    #[test]
+    fn test_is_not_null() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("age", 2));
+        let expr = Arc::new(IsNotNullExpr::new(col));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert_eq!(
+            format!("{:?}", predicate),
+            "Not(IsNull { column: \"age\" })"
+        );
+    }
+
+    #[test]
+    fn test_not_expr() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("id", 0));
+        let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+        let eq_expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        let not_expr = Arc::new(NotExpr::new(eq_expr));
+
+        let result = convert_predicate_to_orc(Some(not_expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert!(format!("{:?}", predicate).starts_with("Not(Comparison"));
+    }
+
+    #[test]
+    fn test_in_list() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("id", 0));
+        let values = vec![
+            Arc::new(Literal::new(ScalarValue::Int32(Some(1))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(2))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(3))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+        ];
+        let expr = Arc::new(InListExpr::new(col, values, false, None));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        // IN list should be converted to OR of equality predicates
+        assert!(format!("{:?}", predicate).starts_with("Or(["));
+    }
+
+    #[test]
+    fn test_not_in_list() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("name", 1));
+        let values = vec![
+            Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_string()))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+        ];
+        let expr = Arc::new(InListExpr::new(col, values, true, None));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        // NOT IN should be converted to NOT(OR(...))
+        assert!(format!("{:?}", predicate).starts_with("Not(Or(["));
+    }
+
+    #[test]
+    fn test_and_simple() {
+        let schema = create_test_schema();
+        // id = 42 AND age > 18
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+        let and_expr = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+        let result = convert_predicate_to_orc(Some(and_expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert!(format!("{:?}", predicate).starts_with("And(["));
+    }
+
+    #[test]
+    fn test_and_nested_flattening() {
+        let schema = create_test_schema();
+        // ((id = 1 AND age = 2) AND name = "foo") should be flattened to 
And([...])
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+        let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+        let col3 = Arc::new(Column::new("name", 1));
+        let lit3 = 
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+        let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+        let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+        let result = convert_predicate_to_orc(Some(and2), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to And([cond1, cond2, cond3])
+        assert!(debug_str.starts_with("And(["));
+        // Count the number of conditions (should be 3)
+        let condition_count = debug_str.matches("Comparison").count();
+        assert_eq!(condition_count, 3);
+    }
+
+    #[test]
+    fn test_or_simple() {
+        let schema = create_test_schema();
+        // id = 1 OR id = 2
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq, 
lit1));
+
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+        let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+        let result = convert_predicate_to_orc(Some(or_expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        assert!(format!("{:?}", predicate).starts_with("Or(["));
+    }
+
+    #[test]
+    fn test_or_nested_flattening() {
+        let schema = create_test_schema();
+        // ((id = 1 OR age = 2) OR score = 3.0) should be flattened
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+        let or1 = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+        let col3 = Arc::new(Column::new("score", 3));
+        let lit3 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+        let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+        let or2 = Arc::new(BinaryExpr::new(or1, Operator::Or, expr3));
+
+        let result = convert_predicate_to_orc(Some(or2), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to Or([cond1, cond2, cond3])
+        assert!(debug_str.starts_with("Or(["));
+        let condition_count = debug_str.matches("Comparison").count();
+        assert_eq!(condition_count, 3);
+    }
+
+    #[test]
+    fn test_complex_mixed_predicates() {
+        let schema = create_test_schema();
+        // (id = 1 OR id = 2) AND name IS NOT NULL
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq, 
lit1));
+
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+        let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+        let col2 = Arc::new(Column::new("name", 1));
+        let is_not_null = Arc::new(IsNotNullExpr::new(col2));
+
+        let and_expr = Arc::new(BinaryExpr::new(or_expr, Operator::And, 
is_not_null));
+
+        let result = convert_predicate_to_orc(Some(and_expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should have And at top level
+        assert!(
+            debug_str.contains("And"),
+            "Expected And, got: {}",
+            debug_str
+        );
+        // Should contain OR for the id conditions
+        assert!(debug_str.contains("Or"), "Expected Or, got: {}", debug_str);
+        // Should contain the IS NOT NULL condition
+        assert!(
+            debug_str.contains("IsNull"),
+            "Expected IsNull, got: {}",
+            debug_str
+        );
+    }
+
+    #[test]
+    fn test_deeply_nested_and() {
+        let schema = create_test_schema();
+        // Build: (((id = 1 AND age = 2) AND name = "foo") AND score = 3.0)
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Eq, lit2));
+
+        let and1 = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+        let col3 = Arc::new(Column::new("name", 1));
+        let lit3 = 
Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))));
+        let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+        let and2 = Arc::new(BinaryExpr::new(and1, Operator::And, expr3));
+
+        let col4 = Arc::new(Column::new("score", 3));
+        let lit4 = Arc::new(Literal::new(ScalarValue::Float64(Some(3.0))));
+        let expr4 = Arc::new(BinaryExpr::new(col4, Operator::Eq, lit4));
+
+        let and3 = Arc::new(BinaryExpr::new(and2, Operator::And, expr4));
+
+        let result = convert_predicate_to_orc(Some(and3), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to And([cond1, cond2, cond3, cond4])
+        assert!(debug_str.starts_with("And(["));
+        let condition_count = debug_str.matches("Comparison").count();
+        assert_eq!(condition_count, 4);
+    }
+
+    #[test]
+    fn test_all_scalar_types() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("col_bool", DataType::Boolean, true),
+            Field::new("col_i8", DataType::Int8, true),
+            Field::new("col_i16", DataType::Int16, true),
+            Field::new("col_i32", DataType::Int32, true),
+            Field::new("col_i64", DataType::Int64, true),
+            Field::new("col_f32", DataType::Float32, true),
+            Field::new("col_f64", DataType::Float64, true),
+            Field::new("col_utf8", DataType::Utf8, true),
+        ]));
+
+        // Test Boolean
+        let col = Arc::new(Column::new("col_bool", 0));
+        let lit = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Int8
+        let col = Arc::new(Column::new("col_i8", 1));
+        let lit = Arc::new(Literal::new(ScalarValue::Int8(Some(42))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Int16
+        let col = Arc::new(Column::new("col_i16", 2));
+        let lit = Arc::new(Literal::new(ScalarValue::Int16(Some(1000))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Int32
+        let col = Arc::new(Column::new("col_i32", 3));
+        let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(100000))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Int64
+        let col = Arc::new(Column::new("col_i64", 4));
+        let lit = Arc::new(Literal::new(ScalarValue::Int64(Some(1000000000))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Float32
+        let col = Arc::new(Column::new("col_f32", 5));
+        let lit = Arc::new(Literal::new(ScalarValue::Float32(Some(3.14))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Float64
+        let col = Arc::new(Column::new("col_f64", 6));
+        let lit = Arc::new(Literal::new(ScalarValue::Float64(Some(2.718))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+
+        // Test Utf8
+        let col = Arc::new(Column::new("col_utf8", 7));
+        let lit = 
Arc::new(Literal::new(ScalarValue::Utf8(Some("test".to_string()))));
+        let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, lit));
+        assert!(convert_predicate_to_orc(Some(expr), &schema).is_some());
+    }
+
+    #[test]
+    fn test_null_literal_in_comparison() {
+        let schema = create_test_schema();
+
+        // Test: WHERE id = NULL
+        // Note: In SQL semantics, "col = NULL" always evaluates to NULL (not 
true or
+        // false) However, we should still handle it gracefully
+        let col = Arc::new(Column::new("id", 0));
+        let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+        let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Eq, 
null_lit.clone()));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        // Should convert to an ORC predicate (even though semantically it 
won't match
+        // anything)
+        assert!(result.is_some());
+
+        // Test: WHERE id != NULL
+        let expr = Arc::new(BinaryExpr::new(
+            col.clone(),
+            Operator::NotEq,
+            null_lit.clone(),
+        ));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test: WHERE id < NULL
+        let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Lt, 
null_lit.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test: WHERE id > NULL
+        let expr = Arc::new(BinaryExpr::new(col.clone(), Operator::Gt, 
null_lit.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test: WHERE NULL = id (reversed)
+        let expr = Arc::new(BinaryExpr::new(null_lit.clone(), Operator::Eq, 
col.clone()));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test: WHERE NULL < id (reversed)
+        let expr = Arc::new(BinaryExpr::new(null_lit, Operator::Lt, col));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+    }
+
+    #[test]
+    fn test_null_in_in_list() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("id", 0));
+
+        // Test: WHERE id IN (1, NULL, 3)
+        // This should handle NULL gracefully
+        let values = vec![
+            Arc::new(Literal::new(ScalarValue::Int32(Some(1))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Int32(None)))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Int32(Some(3))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+        ];
+        let expr = Arc::new(InListExpr::new(col.clone(), values, false, None));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be converted to OR of equality predicates
+        // NULL values should be included (even though they won't match)
+        assert!(debug_str.starts_with("Or(["));
+    }
+
+    #[test]
+    fn test_null_in_not_in_list() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("name", 1));
+
+        // Test: WHERE name NOT IN ('foo', NULL, 'bar')
+        let values = vec![
+            Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_string()))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Utf8(None)))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_string()))))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+        ];
+        let expr = Arc::new(InListExpr::new(col, values, true, None));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be NOT(OR(...))
+        assert!(debug_str.starts_with("Not(Or(["));
+    }
+
+    #[test]
+    fn test_all_null_in_list() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("id", 0));
+
+        // Test: WHERE id IN (NULL, NULL, NULL)
+        // Edge case: all values are NULL
+        let values = vec![
+            Arc::new(Literal::new(ScalarValue::Int32(None)))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Int32(None)))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+            Arc::new(Literal::new(ScalarValue::Int32(None)))
+                as Arc<dyn datafusion::physical_expr::PhysicalExpr>,
+        ];
+        let expr = Arc::new(InListExpr::new(col, values, false, None));
+
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        // Should still produce a predicate (though it won't match anything)
+        assert!(result.is_some());
+    }
+
+    #[test]
+    fn test_null_with_and_predicate() {
+        let schema = create_test_schema();
+
+        // Test: WHERE id = NULL AND age > 18
+        let col1 = Arc::new(Column::new("id", 0));
+        let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, null_lit));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+        let and_expr = Arc::new(BinaryExpr::new(expr1, Operator::And, expr2));
+
+        let result = convert_predicate_to_orc(Some(and_expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to And([...])
+        assert!(debug_str.contains("And"));
+    }
+
+    #[test]
+    fn test_null_with_or_predicate() {
+        let schema = create_test_schema();
+
+        // Test: WHERE id = NULL OR age > 18
+        let col1 = Arc::new(Column::new("id", 0));
+        let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, null_lit));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+        let or_expr = Arc::new(BinaryExpr::new(expr1, Operator::Or, expr2));
+
+        let result = convert_predicate_to_orc(Some(or_expr), &schema);
+        assert!(result.is_some());
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to Or([...])
+        assert!(debug_str.contains("Or"));
+    }
+
+    #[test]
+    fn test_various_null_types() {
+        let schema = create_test_schema();
+
+        // Test NULL with different data types
+        let test_cases = vec![
+            ("id", ScalarValue::Int32(None)),
+            ("name", ScalarValue::Utf8(None)),
+            ("age", ScalarValue::Int32(None)),
+            ("score", ScalarValue::Float64(None)),
+        ];
+
+        for (col_name, null_value) in test_cases {
+            let col = Arc::new(Column::new(col_name, 
schema.index_of(col_name).unwrap()));
+            let null_lit = Arc::new(Literal::new(null_value));
+            let expr = Arc::new(BinaryExpr::new(col, Operator::Eq, null_lit));
+
+            let result = convert_predicate_to_orc(Some(expr), &schema);
+            assert!(
+                result.is_some(),
+                "Failed to convert NULL comparison for column: {}",
+                col_name
+            );
+        }
+    }
+
+    #[test]
+    fn test_null_literal_edge_cases() {
+        let schema = create_test_schema();
+        let col = Arc::new(Column::new("id", 0));
+
+        // Test: WHERE id >= NULL
+        let null_lit = Arc::new(Literal::new(ScalarValue::Int32(None)));
+        let expr = Arc::new(BinaryExpr::new(
+            col.clone(),
+            Operator::GtEq,
+            null_lit.clone(),
+        ));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+
+        // Test: WHERE id <= NULL
+        let expr = Arc::new(BinaryExpr::new(col, Operator::LtEq, null_lit));
+        let result = convert_predicate_to_orc(Some(expr), &schema);
+        assert!(result.is_some());
+    }
+
+    #[test]
+    fn test_where_false_with_empty_schema() {
+        // Edge case: WHERE false with an empty schema (no columns)
+        let empty_schema = Arc::new(Schema::empty());
+        let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))));
+
+        let result = convert_predicate_to_orc(Some(expr), &empty_schema);
+
+        // Should still produce a predicate using a synthetic column name
+        // to ensure all data is filtered
+        assert!(
+            result.is_some(),
+            "WHERE false should produce a predicate even with empty schema"
+        );
+
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+
+        // Should use the synthetic column name
+        assert!(
+            debug_str.contains("__orc_where_false_constant__") || 
debug_str.contains("And"),
+            "Expected synthetic column or And predicate, got: {}",
+            debug_str
+        );
+    }
+
+    #[test]
+    fn test_where_true_with_empty_schema() {
+        // Edge case: WHERE true with an empty schema
+        let empty_schema = Arc::new(Schema::empty());
+        let expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))));
+
+        let result = convert_predicate_to_orc(Some(expr), &empty_schema);
+
+        // WHERE true should always return None (no filtering)
+        assert!(
+            result.is_none(),
+            "WHERE true should not produce any predicate"
+        );
+    }
+
+    #[test]
+    fn test_short_circuit_and_expr() {
+        let schema = create_test_schema();
+
+        // Test: SCAndExpr (short-circuit AND)
+        // Simulating: WHERE id = 42 AND age > 18
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+        let sc_and_expr = Arc::new(SCAndExpr::new(expr1, expr2));
+
+        let result = convert_predicate_to_orc(Some(sc_and_expr), &schema);
+        assert!(result.is_some(), "SCAndExpr should convert to predicate");
+
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to And([...])
+        assert!(
+            debug_str.contains("And"),
+            "Expected And predicate, got: {}",
+            debug_str
+        );
+    }
+
+    #[test]
+    fn test_short_circuit_or_expr() {
+        let schema = create_test_schema();
+
+        // Test: SCOrExpr (short-circuit OR)
+        // Simulating: WHERE id = 1 OR id = 2
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq, 
lit1));
+
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+        let sc_or_expr = Arc::new(SCOrExpr::new(expr1, expr2));
+
+        let result = convert_predicate_to_orc(Some(sc_or_expr), &schema);
+        assert!(result.is_some(), "SCOrExpr should convert to predicate");
+
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should be flattened to Or([...])
+        assert!(
+            debug_str.contains("Or"),
+            "Expected Or predicate, got: {}",
+            debug_str
+        );
+    }
+
+    #[test]
+    fn test_nested_short_circuit_exprs() {
+        let schema = create_test_schema();
+
+        // Test: Nested SCAndExpr and SCOrExpr
+        // Simulating: WHERE (id = 1 OR id = 2) AND age > 18
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(1))));
+        let expr1 = Arc::new(BinaryExpr::new(col1.clone(), Operator::Eq, 
lit1));
+
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(2))));
+        let expr2 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit2));
+
+        let sc_or_expr = Arc::new(SCOrExpr::new(expr1, expr2));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit3 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+        let expr3 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit3));
+
+        let sc_and_expr = Arc::new(SCAndExpr::new(sc_or_expr, expr3));
+
+        let result = convert_predicate_to_orc(Some(sc_and_expr), &schema);
+        assert!(
+            result.is_some(),
+            "Nested SCAndExpr/SCOrExpr should convert to predicate"
+        );
+
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should have And at top level with Or inside
+        assert!(
+            debug_str.contains("And"),
+            "Expected And at top level, got: {}",
+            debug_str
+        );
+        assert!(
+            debug_str.contains("Or"),
+            "Expected Or inside And, got: {}",
+            debug_str
+        );
+    }
+
+    #[test]
+    fn test_mixed_binary_and_short_circuit() {
+        let schema = create_test_schema();
+
+        // Test: Mix of BinaryExpr (AND) and SCAndExpr
+        // Simulating: WHERE id = 42 AND (age > 18 AND status = 'ACTIVE')
+        let col1 = Arc::new(Column::new("id", 0));
+        let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42))));
+        let expr1 = Arc::new(BinaryExpr::new(col1, Operator::Eq, lit1));
+
+        let col2 = Arc::new(Column::new("age", 2));
+        let lit2 = Arc::new(Literal::new(ScalarValue::Int32(Some(18))));
+        let expr2 = Arc::new(BinaryExpr::new(col2, Operator::Gt, lit2));
+
+        let col3 = Arc::new(Column::new("name", 1));
+        let lit3 = 
Arc::new(Literal::new(ScalarValue::Utf8(Some("ACTIVE".to_string()))));
+        let expr3 = Arc::new(BinaryExpr::new(col3, Operator::Eq, lit3));
+
+        let sc_and_inner = Arc::new(SCAndExpr::new(expr2, expr3));
+        let binary_and_outer = Arc::new(BinaryExpr::new(expr1, Operator::And, 
sc_and_inner));
+
+        let result = convert_predicate_to_orc(Some(binary_and_outer), &schema);
+        assert!(
+            result.is_some(),
+            "Mixed BinaryExpr/SCAndExpr should convert to predicate"
+        );
+
+        let predicate = result.unwrap();
+        let debug_str = format!("{:?}", predicate);
+        // Should all be flattened to And([...])
+        assert!(
+            debug_str.contains("And"),
+            "Expected And predicate, got: {}",
+            debug_str
+        );
+        // Should have 3 conditions flattened
+        let condition_count = debug_str.matches("Comparison").count();
+        assert_eq!(
+            condition_count, 3,
+            "Expected 3 comparison conditions, got: {}",
+            condition_count
+        );
+    }
+}

Reply via email to