rluvaton commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2455638557


##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    if optimize {
+        filter_builder = filter_builder.optimize();
+    }
+    filter_builder.build()
+}
+
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    unsafe {
+        Ok(RecordBatch::new_unchecked(
+            record_batch.schema(),
+            filtered_columns,
+            filter.count(),
+        ))
+    }
+}
+
+#[inline(always)]
+fn filter_array(
+    array: &dyn Array,
+    filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+    filter.filter(array)
+}
+
+struct ResultBuilder {
+    data_type: DataType,
+    // A Vec of partial results that should be merged. 
`partial_result_indices` contains
+    // indexes into this vec.
+    partial_results: Vec<ArrayData>,
+    // Indicates per result row from which array in `partial_results` a value 
should be taken.
+    // The indexes in this array are offset by +1. The special value 0 
indicates null values.
+    partial_result_indices: Vec<usize>,
+    // An optional result that is the covering result for all rows.
+    // This is used as an optimisation to avoid the cost of merging when all 
rows
+    // evaluate to the same case branch.
+    covering_result: Option<ColumnarValue>,

Review Comment:
   What does covering means



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    if optimize {
+        filter_builder = filter_builder.optimize();
+    }
+    filter_builder.build()
+}
+
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    unsafe {
+        Ok(RecordBatch::new_unchecked(
+            record_batch.schema(),
+            filtered_columns,
+            filter.count(),
+        ))
+    }
+}
+
+#[inline(always)]
+fn filter_array(
+    array: &dyn Array,
+    filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+    filter.filter(array)
+}
+
+struct ResultBuilder {
+    data_type: DataType,
+    // A Vec of partial results that should be merged. 
`partial_result_indices` contains
+    // indexes into this vec.
+    partial_results: Vec<ArrayData>,
+    // Indicates per result row from which array in `partial_results` a value 
should be taken.
+    // The indexes in this array are offset by +1. The special value 0 
indicates null values.
+    partial_result_indices: Vec<usize>,
+    // An optional result that is the covering result for all rows.
+    // This is used as an optimisation to avoid the cost of merging when all 
rows
+    // evaluate to the same case branch.
+    covering_result: Option<ColumnarValue>,
+}
+
+impl ResultBuilder {
+    fn new(data_type: &DataType, capacity: usize) -> Self {
+        Self {
+            data_type: data_type.clone(),
+            partial_result_indices: vec![0; capacity],
+            partial_results: vec![],
+            covering_result: None,
+        }
+    }
+
+    /// Adds a result value.
+    ///
+    /// `rows` should be a [UInt32Array] containing [RecordBatch] relative row 
indices
+    /// for which `value` contains result values.
+    ///
+    /// If `value` is a scalar, the scalar value is used for each row in 
`rows`.
+    /// If `value` is an array, the values from the array and the indices from 
`rows` will be
+    /// processed pairwise.
+    fn add_result(&mut self, rows: &ArrayRef, value: ColumnarValue) -> 
Result<()> {
+        match value {
+            ColumnarValue::Array(a) => {
+                assert_eq!(a.len(), rows.len());
+                if rows.len() == self.partial_result_indices.len() {
+                    self.set_covering_result(ColumnarValue::Array(a));
+                } else {
+                    self.add_partial_result(rows, a.to_data());
+                }
+            }
+            ColumnarValue::Scalar(s) => {
+                if rows.len() == self.partial_result_indices.len() {
+                    self.set_covering_result(ColumnarValue::Scalar(s));
+                } else {
+                    self.add_partial_result(
+                        rows,
+                        s.to_array_of_size(rows.len())?.to_data(),
+                    );
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) {
+        assert!(self.covering_result.is_none());
+
+        self.partial_results.push(data);
+        let array_index = self.partial_results.len();
+
+        for row_ix in rows.as_primitive::<UInt32Type>().values().iter() {
+            self.partial_result_indices[*row_ix as usize] = array_index;
+        }
+    }
+
+    fn set_covering_result(&mut self, value: ColumnarValue) {
+        assert!(self.partial_results.is_empty());
+        self.covering_result = Some(value);
+    }
+
+    fn finish(self) -> Result<ColumnarValue> {
+        match self.covering_result {
+            Some(v) => {
+                // If we have a covering result, we can just return it.
+                Ok(v)
+            }
+            None => match self.partial_results.len() {
+                0 => {
+                    // No covering result and no partial results.
+                    // This can happen for case expressions with no else 
branch where no rows
+                    // matched.
+                    Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+                        &self.data_type,
+                    )?))
+                }
+                n => {
+                    // There are n partial results.
+                    // Merge into a single array.
+
+                    let data_refs = self.partial_results.iter().collect();
+                    let mut mutable = MutableArrayData::new(

Review Comment:
   Using mutable array is not very performant for the case when there can be a 
small range of values from the same array.
   
   Isn't it basically an interleave?



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -94,7 +95,7 @@ pub struct CaseExpr {
     /// Optional "else" expression
     else_expr: Option<Arc<dyn PhysicalExpr>>,
     /// Evaluation method to use
-    eval_method: EvalMethod,
+    pub eval_method: EvalMethod,

Review Comment:
   why the pub is needed here?
   And also this is an implementation detail that I don't think need to be 
expose



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    if optimize {
+        filter_builder = filter_builder.optimize();
+    }
+    filter_builder.build()
+}
+
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    unsafe {
+        Ok(RecordBatch::new_unchecked(
+            record_batch.schema(),
+            filtered_columns,
+            filter.count(),
+        ))
+    }
+}
+
+#[inline(always)]
+fn filter_array(
+    array: &dyn Array,
+    filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+    filter.filter(array)
+}
+
+struct ResultBuilder {
+    data_type: DataType,
+    // A Vec of partial results that should be merged. 
`partial_result_indices` contains
+    // indexes into this vec.
+    partial_results: Vec<ArrayData>,
+    // Indicates per result row from which array in `partial_results` a value 
should be taken.
+    // The indexes in this array are offset by +1. The special value 0 
indicates null values.
+    partial_result_indices: Vec<usize>,
+    // An optional result that is the covering result for all rows.
+    // This is used as an optimisation to avoid the cost of merging when all 
rows
+    // evaluate to the same case branch.
+    covering_result: Option<ColumnarValue>,
+}
+
+impl ResultBuilder {
+    fn new(data_type: &DataType, capacity: usize) -> Self {
+        Self {
+            data_type: data_type.clone(),
+            partial_result_indices: vec![0; capacity],
+            partial_results: vec![],
+            covering_result: None,
+        }
+    }
+
+    /// Adds a result value.
+    ///
+    /// `rows` should be a [UInt32Array] containing [RecordBatch] relative row 
indices
+    /// for which `value` contains result values.
+    ///
+    /// If `value` is a scalar, the scalar value is used for each row in 
`rows`.
+    /// If `value` is an array, the values from the array and the indices from 
`rows` will be
+    /// processed pairwise.
+    fn add_result(&mut self, rows: &ArrayRef, value: ColumnarValue) -> 
Result<()> {
+        match value {
+            ColumnarValue::Array(a) => {
+                assert_eq!(a.len(), rows.len());
+                if rows.len() == self.partial_result_indices.len() {
+                    self.set_covering_result(ColumnarValue::Array(a));
+                } else {
+                    self.add_partial_result(rows, a.to_data());
+                }
+            }
+            ColumnarValue::Scalar(s) => {
+                if rows.len() == self.partial_result_indices.len() {
+                    self.set_covering_result(ColumnarValue::Scalar(s));
+                } else {
+                    self.add_partial_result(
+                        rows,
+                        s.to_array_of_size(rows.len())?.to_data(),
+                    );
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) {

Review Comment:
   Can you add comments what does rows for and data for?
   
   And rename if needed



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -15,30 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use super::{Column, Literal};
 use crate::expressions::try_cast;
 use crate::PhysicalExpr;
-use std::borrow::Cow;
-use std::hash::Hash;
-use std::{any::Any, sync::Arc};
-
 use arrow::array::*;
 use arrow::compute::kernels::zip::zip;
-use arrow::compute::{and, and_not, is_null, not, nullif, or, 
prep_null_mask_filter};
-use arrow::datatypes::{DataType, Schema};
+use arrow::compute::{
+    is_null, not, nullif, prep_null_mask_filter, FilterBuilder, 
FilterPredicate,
+};
+use arrow::datatypes::{DataType, Schema, UInt32Type};
+use arrow::error::ArrowError;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{
     exec_err, internal_datafusion_err, internal_err, DataFusionError, Result, 
ScalarValue,
 };
 use datafusion_expr::ColumnarValue;
-
-use super::{Column, Literal};
 use datafusion_physical_expr_common::datum::compare_with_eq;
 use itertools::Itertools;
+use std::borrow::Cow;
+use std::hash::Hash;
+use std::{any::Any, sync::Arc};
 
 type WhenThen = (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>);
 
 #[derive(Debug, Hash, PartialEq, Eq)]
-enum EvalMethod {
+pub enum EvalMethod {

Review Comment:
   Why pub here, this is an implementation detail that I don't think we should 
expose



##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    if optimize {
+        filter_builder = filter_builder.optimize();
+    }
+    filter_builder.build()
+}
+
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    unsafe {

Review Comment:
   Is this a hot loop? If not why unsafe is needed? The check is inexpensive
   
   Generally when writing unsafe comment you need to write a comment why it is 
safe to use and what is the benefit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to