pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2456088760
##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>)
-> bool {
expr.as_any().is::<Column>()
}
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+ let mut filter_builder = FilterBuilder::new(predicate);
+ if optimize {
+ filter_builder = filter_builder.optimize();
+ }
+ filter_builder.build()
+}
+
+fn filter_record_batch(
+ record_batch: &RecordBatch,
+ filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+ let filtered_columns = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, filter))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+ unsafe {
+ Ok(RecordBatch::new_unchecked(
+ record_batch.schema(),
+ filtered_columns,
+ filter.count(),
+ ))
+ }
+}
+
+#[inline(always)]
+fn filter_array(
+ array: &dyn Array,
+ filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+ filter.filter(array)
+}
+
+struct ResultBuilder {
+ data_type: DataType,
+ // A Vec of partial results that should be merged.
`partial_result_indices` contains
+ // indexes into this vec.
+ partial_results: Vec<ArrayData>,
+ // Indicates per result row from which array in `partial_results` a value
should be taken.
+ // The indexes in this array are offset by +1. The special value 0
indicates null values.
+ partial_result_indices: Vec<usize>,
+ // An optional result that is the covering result for all rows.
+ // This is used as an optimisation to avoid the cost of merging when all
rows
+ // evaluate to the same case branch.
+ covering_result: Option<ColumnarValue>,
+}
+
+impl ResultBuilder {
+ fn new(data_type: &DataType, capacity: usize) -> Self {
+ Self {
+ data_type: data_type.clone(),
+ partial_result_indices: vec![0; capacity],
+ partial_results: vec![],
+ covering_result: None,
+ }
+ }
+
+ /// Adds a result value.
+ ///
+ /// `rows` should be a [UInt32Array] containing [RecordBatch] relative row
indices
+ /// for which `value` contains result values.
+ ///
+ /// If `value` is a scalar, the scalar value is used for each row in
`rows`.
+ /// If `value` is an array, the values from the array and the indices from
`rows` will be
+ /// processed pairwise.
+ fn add_result(&mut self, rows: &ArrayRef, value: ColumnarValue) ->
Result<()> {
+ match value {
+ ColumnarValue::Array(a) => {
+ assert_eq!(a.len(), rows.len());
+ if rows.len() == self.partial_result_indices.len() {
+ self.set_covering_result(ColumnarValue::Array(a));
+ } else {
+ self.add_partial_result(rows, a.to_data());
+ }
+ }
+ ColumnarValue::Scalar(s) => {
+ if rows.len() == self.partial_result_indices.len() {
+ self.set_covering_result(ColumnarValue::Scalar(s));
+ } else {
+ self.add_partial_result(
+ rows,
+ s.to_array_of_size(rows.len())?.to_data(),
+ );
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) {
+ assert!(self.covering_result.is_none());
+
+ self.partial_results.push(data);
+ let array_index = self.partial_results.len();
+
+ for row_ix in rows.as_primitive::<UInt32Type>().values().iter() {
+ self.partial_result_indices[*row_ix as usize] = array_index;
+ }
+ }
+
+ fn set_covering_result(&mut self, value: ColumnarValue) {
+ assert!(self.partial_results.is_empty());
+ self.covering_result = Some(value);
+ }
+
+ fn finish(self) -> Result<ColumnarValue> {
+ match self.covering_result {
+ Some(v) => {
+ // If we have a covering result, we can just return it.
+ Ok(v)
+ }
+ None => match self.partial_results.len() {
+ 0 => {
+ // No covering result and no partial results.
+ // This can happen for case expressions with no else
branch where no rows
+ // matched.
+ Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+ &self.data_type,
+ )?))
+ }
+ n => {
+ // There are n partial results.
+ // Merge into a single array.
+
+ let data_refs = self.partial_results.iter().collect();
+ let mut mutable = MutableArrayData::new(
Review Comment:
It might be useful to explicitly point out that the big change in this PR is
that the per branch results are no longer scattered back to the length of the
input record batch. Instead the potentially small results are held as is in
small arrays. Only at the end everything gets consolidated.
What's not yet handled in an optimal fashion by this code is the variant
you're working where you know up front that all the values are going to be
scalars. That's intentional, I'm only trying to improve the general case first.
In other words, please compare with the status quo on `main` and not with all
the potential further optimisations that might be possible.
One further optimisation that would be nice to add would be based on
https://github.com/apache/arrow-rs/pull/8658. That would allow us to avoid
expanding scalars to arrays and instead fold that into the merge operation. Not
available for use just yet though, so that will have to wait for later.
https://github.com/apache/arrow-rs/pull/8653 will be useful for
`ExprOrExpr`, but is going to be of more limited use in the general eval
methods. You can only zip two scalars and at that point you have a scalar and a
non scalar. When we have to reduce more than two arrays it's back to regular
zip (which is kind of what I'm doing here, but without the alignment
requirement and in a single pass for all arrays).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]