pepijnve commented on code in PR #18152:
URL: https://github.com/apache/datafusion/pull/18152#discussion_r2455874629
##########
datafusion/physical-expr/src/expressions/case.rs:
##########
@@ -122,6 +123,181 @@ fn is_cheap_and_infallible(expr: &Arc<dyn PhysicalExpr>)
-> bool {
expr.as_any().is::<Column>()
}
+/// Creates a [FilterPredicate] from a boolean array.
+fn create_filter(predicate: &BooleanArray, optimize: bool) -> FilterPredicate {
+ let mut filter_builder = FilterBuilder::new(predicate);
+ if optimize {
+ filter_builder = filter_builder.optimize();
+ }
+ filter_builder.build()
+}
+
+fn filter_record_batch(
+ record_batch: &RecordBatch,
+ filter: &FilterPredicate,
+) -> std::result::Result<RecordBatch, ArrowError> {
+ let filtered_columns = record_batch
+ .columns()
+ .iter()
+ .map(|a| filter_array(a, filter))
+ .collect::<std::result::Result<Vec<_>, _>>()?;
+ unsafe {
+ Ok(RecordBatch::new_unchecked(
+ record_batch.schema(),
+ filtered_columns,
+ filter.count(),
+ ))
+ }
+}
+
+#[inline(always)]
+fn filter_array(
+ array: &dyn Array,
+ filter: &FilterPredicate,
+) -> std::result::Result<ArrayRef, ArrowError> {
+ filter.filter(array)
+}
+
+struct ResultBuilder {
+ data_type: DataType,
+ // A Vec of partial results that should be merged.
`partial_result_indices` contains
+ // indexes into this vec.
+ partial_results: Vec<ArrayData>,
+ // Indicates per result row from which array in `partial_results` a value
should be taken.
+ // The indexes in this array are offset by +1. The special value 0
indicates null values.
+ partial_result_indices: Vec<usize>,
+ // An optional result that is the covering result for all rows.
+ // This is used as an optimisation to avoid the cost of merging when all
rows
+ // evaluate to the same case branch.
+ covering_result: Option<ColumnarValue>,
+}
+
+impl ResultBuilder {
+ fn new(data_type: &DataType, capacity: usize) -> Self {
+ Self {
+ data_type: data_type.clone(),
+ partial_result_indices: vec![0; capacity],
+ partial_results: vec![],
+ covering_result: None,
+ }
+ }
+
+ /// Adds a result value.
+ ///
+ /// `rows` should be a [UInt32Array] containing [RecordBatch] relative row
indices
+ /// for which `value` contains result values.
+ ///
+ /// If `value` is a scalar, the scalar value is used for each row in
`rows`.
+ /// If `value` is an array, the values from the array and the indices from
`rows` will be
+ /// processed pairwise.
+ fn add_result(&mut self, rows: &ArrayRef, value: ColumnarValue) ->
Result<()> {
+ match value {
+ ColumnarValue::Array(a) => {
+ assert_eq!(a.len(), rows.len());
+ if rows.len() == self.partial_result_indices.len() {
+ self.set_covering_result(ColumnarValue::Array(a));
+ } else {
+ self.add_partial_result(rows, a.to_data());
+ }
+ }
+ ColumnarValue::Scalar(s) => {
+ if rows.len() == self.partial_result_indices.len() {
+ self.set_covering_result(ColumnarValue::Scalar(s));
+ } else {
+ self.add_partial_result(
+ rows,
+ s.to_array_of_size(rows.len())?.to_data(),
+ );
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) {
Review Comment:
Isn't that just going to be repeating the comments from `add_branch_result`?
I don't think it's very useful to repeat the same thing over and over again.
It's not like this is public API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]