pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2462740710


##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,276 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>) 
-> bool {
     expr.as_any().is::<Column>()
 }
 
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray) -> FilterPredicate {
+    let mut filter_builder = FilterBuilder::new(predicate);
+    // Always optimize the filter since we use them multiple times.
+    filter_builder = filter_builder.optimize();
+    filter_builder.build()
+}
+
+// This should be removed when https://github.com/apache/arrow-rs/pull/8693
+// is merged and becomes available.
+fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+    let filtered_columns = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_array(a, filter))
+        .collect::<std::result::Result<Vec<_>, _>>()?;
+    // SAFETY: since we start from a valid RecordBatch, there's no need to 
revalidate the schema
+    // since the set of columns has not changed.
+    // The input column arrays all had the same length (since they're coming 
from a valid RecordBatch)
+    // and the filtering them with the same filter will produces a new set of 
arrays with identical
+    // lengths.
+    unsafe {
+        Ok(RecordBatch::new_unchecked(
+            record_batch.schema(),
+            filtered_columns,
+            filter.count(),
+        ))
+    }
+}
+
+#[inline(always)]
+fn filter_array(
+    array: &dyn Array,
+    filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+    filter.filter(array)
+}
+
+///
+/// Merges elements by index from a list of [`ArrayData`], creating a new 
[`ColumnarValue`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values` offset by 
1. `indices` is
+/// processed sequentially. The first occurrence of index value `n` will be 
mapped to the first
+/// value of array `n - 1`. The second occurrence to the second value, and so 
on.
+///
+/// The index value `0` is used to indicate null values.
+///
+/// ```text
+/// ┌─────────────────┐      ┌─────────┐                                  
┌─────────────────┐
+/// │        A        │      │    0    │        merge(                    │    
   NULL      │
+/// ├─────────────────┤      ├─────────┤          [values0, values1],     
├─────────────────┤
+/// │        D        │      │    2    │          indices                 │    
    B        │
+/// └─────────────────┘      ├─────────┤        )                         
├─────────────────┤
+///   values array 0         │    2    │      ─────────────────────────▶  │    
    C        │
+///                          ├─────────┤                                  
├─────────────────┤
+///                          │    1    │                                  │    
    A        │
+///                          ├─────────┤                                  
├─────────────────┤
+///                          │    1    │                                  │    
    D        │
+/// ┌─────────────────┐      ├─────────┤                                  
├─────────────────┤
+/// │        B        │      │    2    │                                  │    
    E        │
+/// ├─────────────────┤      └─────────┘                                  
└─────────────────┘
+/// │        C        │
+/// ├─────────────────┤        indices
+/// │        E        │         array                                      
result
+/// └─────────────────┘
+///   values array 1
+/// ```
+fn merge(values: &[ArrayData], indices: &[usize]) -> Result<ArrayRef> {

Review Comment:
   It's very similar to multizip indeed. The key difference with `zip` is that 
the arrays can all be of different length.
   
   The key difference with `interleave` is that this is not a generalised 
gathering operation. We can make use of the fact that the order of the values 
in each array matches the order in which they should be interleaved. That means 
we only need the array index and we can use `MutableArrayData::extend` to copy 
slices.
   
   I had my eye on the work @rluvaton did in zip. A possible future 
optimisation I had in mind was to postpone the expansion of 
`ColumnarValue::Scalar` in `add_branch_result`. Instead we could do that in 
`finish` and if it turns out there are only two scalars try to do something 
more optimal.
   
   The downside would be that in order to use zip you need to keep track of the 
boolean selection vectors again. The whole exercise is a balancing act between 
doing as little state tracking as possible to not introduce overhead, merging 
efficiently in the general multi-branch variant, and keeping the 2-3 branch 
fast.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to