bert-beyondloops commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2460347968
##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,275 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>)
-> bool {
expr.as_any().is::<Column>()
}
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray) -> FilterPredicate {
+ let mut filter_builder = FilterBuilder::new(predicate);
+ // Always optimize the filter since we use them multiple times.
+ filter_builder = filter_builder.optimize();
+ filter_builder.build()
+}
+
+// This should be removed when https://github.com/apache/arrow-rs/pull/8693
+// is merged and becomes available.
+fn filter_record_batch(
+ record_batch: &RecordBatch,
+ filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+ let filtered_columns = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, filter))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+ // SAFETY: since we start from a valid RecordBatch, there's no need to
revalidate the schema
+ // since the set of columns has not changed.
+ // The input column arrays all had the same length (since they're coming
from a valid RecordBatch)
+ // and the filtering them with the same filter will produces a new set of
arrays with identical
+ // lengths.
+ unsafe {
+ Ok(RecordBatch::new_unchecked(
+ record_batch.schema(),
+ filtered_columns,
+ filter.count(),
+ ))
+ }
+}
+
+#[inline(always)]
+fn filter_array(
+ array: &dyn Array,
+ filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+ filter.filter(array)
+}
+
+///
+/// Merges elements by index from a list of [`ArrayData`], creating a new
[`ColumnarValue`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values` offset by
1. `indices` is
+/// processed sequentially. The first occurrence of index value `n` will be
mapped to the first
+/// value of array `n - 1`. The second occurrence to the second value, and so
on.
+///
+/// The index value `0` is used to indicate null values.
+///
+/// ```text
+/// ┌─────────────────┐ ┌─────────┐
┌─────────────────┐
+/// │ A │ │ 0 │ merge( │
NULL │
+/// ├─────────────────┤ ├─────────┤ [values0, values1],
├─────────────────┤
+/// │ D │ │ 2 │ indices │
B │
+/// └─────────────────┘ ├─────────┤ )
├─────────────────┤
+/// values array 0 │ 2 │ ─────────────────────────▶ │
C │
+/// ├─────────┤
├─────────────────┤
+/// │ 1 │ │
A │
+/// ├─────────┤
├─────────────────┤
+/// │ 1 │ │
D │
+/// ┌─────────────────┐ ├─────────┤
├─────────────────┤
+/// │ B │ │ 2 │ │
E │
+/// ├─────────────────┤ └─────────┘
└─────────────────┘
+/// │ C │
+/// ├─────────────────┤ indices
+/// │ E │ array
result
+/// └─────────────────┘
+/// values array 1
+/// ```
+fn merge(values: &[ArrayData], indices: &[usize]) -> Result<ArrayRef> {
+ let data_refs = values.iter().collect();
+ let mut mutable = MutableArrayData::new(data_refs, true, indices.len());
+
+ // This loop extends the mutable array by taking slices from the partial
results.
+ //
+ // take_offsets keeps track of how many values have been taken from each
array.
+ let mut take_offsets = vec![0; values.len() + 1];
+ let mut start_row_ix = 0;
+ loop {
+ let array_ix = indices[start_row_ix];
+
+ // Determine the length of the slice to take.
+ let mut end_row_ix = start_row_ix + 1;
+ while end_row_ix < indices.len() && indices[end_row_ix] == array_ix {
+ end_row_ix += 1;
+ }
+
+ // Extend mutable with either nulls or with values from the array.
+ let start_offset = take_offsets[array_ix];
+ let end_offset = start_offset + (end_row_ix - start_row_ix);
+ if array_ix == 0 {
+ mutable.extend_nulls(end_offset - start_offset);
Review Comment:
end_offset - start_offset : this seems equal to end_row_ix - start_row_ix
which is calculated right above.
Maybe assign a local variable (slice_length?) and reuse here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]