alamb commented on a change in pull request #2068:
URL: https://github.com/apache/arrow-datafusion/pull/2068#discussion_r835490088
##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+ /// Evaluate an expression against a RecordBatch with validity array
+ fn evaluate_selection(
+ &self,
+ batch: &RecordBatch,
+ selection: &BooleanArray,
+ ) -> Result<ColumnarValue> {
+ let mut indices = vec![];
+ for (i, b) in selection.iter().enumerate() {
Review comment:
Do you think it is worth optimizing away the `take` (`copy`) when
`selection` is all true?
##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+ /// Evaluate an expression against a RecordBatch with validity array
+ fn evaluate_selection(
+ &self,
+ batch: &RecordBatch,
+ selection: &BooleanArray,
+ ) -> Result<ColumnarValue> {
+ let mut indices = vec![];
+ for (i, b) in selection.iter().enumerate() {
+ if let Some(true) = b {
+ indices.push(i as u64);
+ }
+ }
+ let indices = UInt64Array::from_iter_values(indices);
+ let tmp_columns = batch
+ .columns()
+ .iter()
+ .map(|c| {
+ take(c.as_ref(), &indices, None)
+ .map_err(|e| DataFusionError::Execution(e.to_string()))
Review comment:
I think there is an `ArrowError` here you could use too:
https://github.com/apache/arrow-datafusion/blob/master/datafusion-common/src/error.rs#L44
You may not even have to use `map_err` at all (maybe ?) is sufficient
##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+ /// Evaluate an expression against a RecordBatch with validity array
+ fn evaluate_selection(
+ &self,
+ batch: &RecordBatch,
+ selection: &BooleanArray,
+ ) -> Result<ColumnarValue> {
+ let mut indices = vec![];
+ for (i, b) in selection.iter().enumerate() {
+ if let Some(true) = b {
+ indices.push(i as u64);
+ }
+ }
+ let indices = UInt64Array::from_iter_values(indices);
Review comment:
I assume you can't just update the `null` mask of the source batch to be
`null` where validity is `false` because things like the divide kernel will
still throw runtime exceptions if the data is 0?
##########
File path: datafusion-physical-expr/src/physical_expr.rs
##########
@@ -38,4 +43,74 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug {
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
/// Evaluate an expression against a RecordBatch
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+ /// Evaluate an expression against a RecordBatch with validity array
+ fn evaluate_selection(
+ &self,
+ batch: &RecordBatch,
+ selection: &BooleanArray,
+ ) -> Result<ColumnarValue> {
+ let mut indices = vec![];
+ for (i, b) in selection.iter().enumerate() {
+ if let Some(true) = b {
+ indices.push(i as u64);
+ }
+ }
+ let indices = UInt64Array::from_iter_values(indices);
+ let tmp_columns = batch
+ .columns()
+ .iter()
+ .map(|c| {
+ take(c.as_ref(), &indices, None)
+ .map_err(|e| DataFusionError::Execution(e.to_string()))
+ })
+ .collect::<Result<Vec<Arc<dyn Array>>>>()?;
+
+ let tmp_batch = RecordBatch::try_new(batch.schema(), tmp_columns)?;
+ let tmp_result = self.evaluate(&tmp_batch)?;
+ if let ColumnarValue::Array(a) = tmp_result {
+ let result = scatter(selection, a.as_ref())?;
+ Ok(ColumnarValue::Array(result))
+ } else {
+ Ok(tmp_result)
+ }
+ }
+}
+
+/// Scatter `truthy` array by boolean mask. When the mask evaluates `true`,
next values of `truthy`
+/// are taken, when the mask evaluates `false` values null values are filled.
+///
+/// # Arguments
+/// * `mask` - Boolean values used to determine where to put the `truthy`
values
+/// * `truthy` - All values of this array are to scatter according to `mask`
into final result.
+fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
+ let truthy = truthy.data();
+
+ let mut mutable = MutableArrayData::new(vec![&*truthy], true, mask.len());
Review comment:
I am probably missing something here but this code looks like it always
creates `BooleanArrays` even when `truthy` is some other type -- in the case
examples you have, the resulting expression is always boolean, but I wonder if
this is always the case
Perhaps it is worth an `assert!` that `truthy.data_type() ==
DataType::Boolean`?
Even better would be some unit tests showing how scatter worked (for boolean
and non boolean arrays)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]