pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2456088760
##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>)
-> bool {
expr.as_any().is::<Column>()
}
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+ let mut filter_builder = FilterBuilder::new(predicate);
+ if optimize {
+ filter_builder = filter_builder.optimize();
+ }
+ filter_builder.build()
+}
+
+fn filter_record_batch(
+ record_batch: &RecordBatch,
+ filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+ let filtered_columns = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, filter))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+ unsafe {
+ Ok(RecordBatch::new_unchecked(
+ record_batch.schema(),
+ filtered_columns,
+ filter.count(),
+ ))
+ }
+}
+
+#[inline(always)]
+fn filter_array(
+ array: &dyn Array,
+ filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+ filter.filter(array)
+}
+
+struct ResultBuilder {
+ data_type: DataType,
+ // A Vec of partial results that should be merged.
`partial_result_indices` contains
+ // indexes into this vec.
+ partial_results: Vec<ArrayData>,
+ // Indicates per result row from which array in `partial_results` a value
should be taken.
+ // The indexes in this array are offset by +1. The special value 0
indicates null values.
+ partial_result_indices: Vec<usize>,
+ // An optional result that is the covering result for all rows.
+ // This is used as an optimisation to avoid the cost of merging when all
rows
+ // evaluate to the same case branch.
+ covering_result: Option<ColumnarValue>,
+}
+
+impl ResultBuilder {
+ fn new(data_type: &DataType, capacity: usize) -> Self {
+ Self {
+ data_type: data_type.clone(),
+ partial_result_indices: vec![0; capacity],
+ partial_results: vec![],
+ covering_result: None,
+ }
+ }
+
+ /// Adds a result value.
+ ///
+ /// `rows` should be a [UInt32Array] containing [RecordBatch] relative row
indices
+ /// for which `value` contains result values.
+ ///
+ /// If `value` is a scalar, the scalar value is used for each row in
`rows`.
+ /// If `value` is an array, the values from the array and the indices from
`rows` will be
+ /// processed pairwise.
+ fn add_result(&mut self, rows: &ArrayRef, value: ColumnarValue) ->
Result<()> {
+ match value {
+ ColumnarValue::Array(a) => {
+ assert_eq!(a.len(), rows.len());
+ if rows.len() == self.partial_result_indices.len() {
+ self.set_covering_result(ColumnarValue::Array(a));
+ } else {
+ self.add_partial_result(rows, a.to_data());
+ }
+ }
+ ColumnarValue::Scalar(s) => {
+ if rows.len() == self.partial_result_indices.len() {
+ self.set_covering_result(ColumnarValue::Scalar(s));
+ } else {
+ self.add_partial_result(
+ rows,
+ s.to_array_of_size(rows.len())?.to_data(),
+ );
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) {
+ assert!(self.covering_result.is_none());
+
+ self.partial_results.push(data);
+ let array_index = self.partial_results.len();
+
+ for row_ix in rows.as_primitive::<UInt32Type>().values().iter() {
+ self.partial_result_indices[*row_ix as usize] = array_index;
+ }
+ }
+
+ fn set_covering_result(&mut self, value: ColumnarValue) {
+ assert!(self.partial_results.is_empty());
+ self.covering_result = Some(value);
+ }
+
+ fn finish(self) -> Result<ColumnarValue> {
+ match self.covering_result {
+ Some(v) => {
+ // If we have a covering result, we can just return it.
+ Ok(v)
+ }
+ None => match self.partial_results.len() {
+ 0 => {
+ // No covering result and no partial results.
+ // This can happen for case expressions with no else
branch where no rows
+ // matched.
+ Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
+ &self.data_type,
+ )?))
+ }
+ n => {
+ // There are n partial results.
+ // Merge into a single array.
+
+ let data_refs = self.partial_results.iter().collect();
+ let mut mutable = MutableArrayData::new(
Review Comment:
It might be useful to explicitly point out that the big change in this PR is
that the per branch results are no longer scattered back to the length of the
input record batch. Instead the potentially small results are held as is in
small arrays. Only at the end everything gets consolidated.
What's not yet handled in an optimal fashion by this code is the variant
you're working where you know up front that all the values are going to be
scalars. That's intentional, I'm only trying to improve the general case first.
In other words, please compare with the status quo on `main` and not with all
the potential further optimisations that might be possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]