pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2462740710
##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,276 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>)
-> bool {
expr.as_any().is::<Column>()
}
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray) -> FilterPredicate {
+ let mut filter_builder = FilterBuilder::new(predicate);
+ // Always optimize the filter since we use them multiple times.
+ filter_builder = filter_builder.optimize();
+ filter_builder.build()
+}
+
+// This should be removed when https://github.com/apache/arrow-rs/pull/8693
+// is merged and becomes available.
+fn filter_record_batch(
+ record_batch: &RecordBatch,
+ filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+ let filtered_columns = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, filter))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+ // SAFETY: since we start from a valid RecordBatch, there's no need to
revalidate the schema
+ // since the set of columns has not changed.
+ // The input column arrays all had the same length (since they're coming
from a valid RecordBatch)
+ // and the filtering them with the same filter will produces a new set of
arrays with identical
+ // lengths.
+ unsafe {
+ Ok(RecordBatch::new_unchecked(
+ record_batch.schema(),
+ filtered_columns,
+ filter.count(),
+ ))
+ }
+}
+
+#[inline(always)]
+fn filter_array(
+ array: &dyn Array,
+ filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+ filter.filter(array)
+}
+
+///
+/// Merges elements by index from a list of [`ArrayData`], creating a new
[`ColumnarValue`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values` offset by
1. `indices` is
+/// processed sequentially. The first occurrence of index value `n` will be
mapped to the first
+/// value of array `n - 1`. The second occurrence to the second value, and so
on.
+///
+/// The index value `0` is used to indicate null values.
+///
+/// ```text
+/// ┌─────────────────┐ ┌─────────┐
┌─────────────────┐
+/// │ A │ │ 0 │ merge( │
NULL │
+/// ├─────────────────┤ ├─────────┤ [values0, values1],
├─────────────────┤
+/// │ D │ │ 2 │ indices │
B │
+/// └─────────────────┘ ├─────────┤ )
├─────────────────┤
+/// values array 0 │ 2 │ ─────────────────────────▶ │
C │
+/// ├─────────┤
├─────────────────┤
+/// │ 1 │ │
A │
+/// ├─────────┤
├─────────────────┤
+/// │ 1 │ │
D │
+/// ┌─────────────────┐ ├─────────┤
├─────────────────┤
+/// │ B │ │ 2 │ │
E │
+/// ├─────────────────┤ └─────────┘
└─────────────────┘
+/// │ C │
+/// ├─────────────────┤ indices
+/// │ E │ array
result
+/// └─────────────────┘
+/// values array 1
+/// ```
+fn merge(values: &[ArrayData], indices: &[usize]) -> Result<ArrayRef> {
Review Comment:
It's very similar to multizip indeed. The key difference with `zip` is that
the arrays can all be of different length.
The key difference with `interleave` is that this is not a generalised
gathering operation. We can make use of the fact that the order of the values
in each array matches the order in which they should be interleaved. That means
we only need the array index and we can use `MutableArrayData::extend` to copy
slices.
I had my eye on the work @rluvaton did in zip. A possible future
optimisation I had in mind was to postpone the expansion of
`ColumnarValue::Scalar` in `add_branch_result`. Instead we could do that in
`finish` and if it turns out there are only two scalars try to do something
more optimal.
The downside would be that in order to use zip you need to keep track of the
boolean selection vectors again. The whole exercise is a balancing act between
doing as little state tracking as possible to not introduce overhead, merging
efficiently in the general multi-branch variant, and keeping the 2-3 branch
fast.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]