pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2455886379


##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    if optimize {
+        filter_builder = filter_builder.optimize();
+    }
+    filter_builder.build()
+}
+
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    unsafe {
+        Ok(RecordBatch::new_unchecked(
+            record_batch.schema(),
+            filtered_columns,
+            filter.count(),
+        ))
+    }
+}
+
+#[inline(always)]
+fn filter_array(
+    array: &dyn Array,
+    filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+    filter.filter(array)
+}
+
+struct ResultBuilder {
+    data_type: DataType,
+    // A Vec of partial results that should be merged. 
`partial_result_indices` contains
+    // indexes into this vec.
+    partial_results: Vec<ArrayData>,
+    // Indicates per result row from which array in `partial_results` a value 
should be taken.
+    // The indexes in this array are offset by +1. The special value 0 
indicates null values.
+    partial_result_indices: Vec<usize>,
+    // An optional result that is the covering result for all rows.
+    // This is used as an optimisation to avoid the cost of merging when all 
rows
+    // evaluate to the same case branch.
+    covering_result: Option<ColumnarValue>,
+}
+
+impl ResultBuilder {
+    fn new(data_type: &DataType, capacity: usize) -> Self {
+        Self {
+            data_type: data_type.clone(),
+            partial_result_indices: vec![0; capacity],
+            partial_results: vec![],
+            covering_result: None,
+        }
+    }
+
+    /// Adds a result value.
+    ///
+    /// `rows` should be a [UInt32Array] containing [RecordBatch] relative row 
indices
+    /// for which `value` contains result values.
+    ///
+    /// If `value` is a scalar, the scalar value is used for each row in 
`rows`.
+    /// If `value` is an array, the values from the array and the indices from 
`rows` will be
+    /// processed pairwise.
+    fn add_result(&mut self, rows: &ArrayRef, value: ColumnarValue) -> 
Result<()> {
+        match value {
+            ColumnarValue::Array(a) => {
+                assert_eq!(a.len(), rows.len());
+                if rows.len() == self.partial_result_indices.len() {
+                    self.set_covering_result(ColumnarValue::Array(a));
+                } else {
+                    self.add_partial_result(rows, a.to_data());
+                }
+            }
+            ColumnarValue::Scalar(s) => {
+                if rows.len() == self.partial_result_indices.len() {
+                    self.set_covering_result(ColumnarValue::Scalar(s));
+                } else {
+                    self.add_partial_result(
+                        rows,
+                        s.to_array_of_size(rows.len())?.to_data(),
+                    );
+                }
+            }
+        }
+        Ok(())
+    }
+
+    fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) {
+        assert!(self.covering_result.is_none());
+
+        self.partial_results.push(data);
+        let array_index = self.partial_results.len();
+
+        for row_ix in rows.as_primitive::<UInt32Type>().values().iter() {
+            self.partial_result_indices[*row_ix as usize] = array_index;
+        }
+    }
+
+    fn set_covering_result(&mut self, value: ColumnarValue) {
+        assert!(self.partial_results.is_empty());
+        self.covering_result = Some(value);
+    }
+
+    fn finish(self) -> Result<ColumnarValue> {
+        match self.covering_result {
+            Some(v) => {
+                // If we have a covering result, we can just return it.
+                Ok(v)
+            }
+            None => match self.partial_results.len() {
+                0 => {
+                    // No covering result and no partial results.
+                    // This can happen for case expressions with no else 
branch where no rows
+                    // matched.
+                    Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+                        &self.data_type,
+                    )?))
+                }
+                n => {
+                    // There are n partial results.
+                    // Merge into a single array.
+
+                    let data_refs = self.partial_results.iter().collect();
+                    let mut mutable = MutableArrayData::new(

Review Comment:
   No, what I'm doing here is less general than `interleave`. I tried that at 
first, but it gave pretty bad performance in certain cases. This is more like a 
multi array `zip`. In contrast to `zip` the rows are not expected to be lined 
up here. Instead values are taken from the start of each array. I'll try to 
make an ascii art drawing of what's being going on.
   
   Regarding the usage of `MutableArrayData`, I took inspiration from the `zip` 
implementation. I tried to avoid reaching this point in the trivial cases to 
avoid overhead where possible. This code path is not taken for the simple 
evaluation methods either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to