This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c5c1dae36 Review `unwrap` and `panic` from the `aggregate` directory
of `datafusion-physical-expr` (#3443)
c5c1dae36 is described below
commit c5c1dae36b45b27bec27cf42862aa5ce32e0d6b5
Author: Ian Alexander Joiner <[email protected]>
AuthorDate: Mon Sep 12 09:21:24 2022 -0400
Review `unwrap` and `panic` from the `aggregate` directory of
`datafusion-physical-expr` (#3443)
* approx agg
* expand downcast_value
* Update datafusion/common/src/lib.rs
* Update datafusion/common/src/lib.rs
* agg almost done
* agg done
Co-authored-by: Andy Grove <[email protected]>
---
datafusion/common/src/error.rs | 16 ++++++++++
datafusion/common/src/lib.rs | 4 +--
.../physical-expr/src/aggregate/covariance.rs | 36 ++++++++++++----------
datafusion/physical-expr/src/aggregate/min_max.rs | 12 ++++----
datafusion/physical-expr/src/aggregate/mod.rs | 7 +++--
datafusion/physical-expr/src/aggregate/sum.rs | 8 ++---
.../physical-expr/src/aggregate/sum_distinct.rs | 6 ++--
datafusion/physical-expr/src/aggregate/variance.rs | 16 ++++------
8 files changed, 61 insertions(+), 44 deletions(-)
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 47a63b320..9157e04a2 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -363,3 +363,19 @@ macro_rules! internal_err {
Err(DataFusionError::Internal(format!($($arg)*)))
};
}
+
+/// Unwrap an `Option` if possible. Otherwise return an
`DataFusionError::Internal`.
+/// In normal usage of DataFusion the unwrap should always succeed.
+///
+/// Example: `let values = unwrap_or_internal_err!(values)`
+#[macro_export]
+macro_rules! unwrap_or_internal_err {
+ ($Value: ident) => {
+ $Value.ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "{} should not be None",
+ stringify!($Value)
+ ))
+ })?
+ };
+}
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index b2ff66af0..2992596c5 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -28,8 +28,8 @@ pub use dfschema::{DFField, DFSchema, DFSchemaRef,
ExprSchema, ToDFSchema};
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
pub use scalar::{ScalarType, ScalarValue};
-/// Downcast an Arrow Array to a concrete type, return an `Err` if the cast is
-/// not possible.
+/// Downcast an Arrow Array to a concrete type, return an
`DataFusionError::Internal` if the cast is
+/// not possible. In normal usage of DataFusion the downcast should always
succeed.
///
/// Example: `let array = downcast_value!(values, Int32Array)`
#[macro_export]
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs
b/datafusion/physical-expr/src/aggregate/covariance.rs
index e54edce14..d621b3bad 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -29,7 +29,7 @@ use arrow::{
datatypes::DataType,
datatypes::Field,
};
-use datafusion_common::{downcast_value, ScalarValue};
+use datafusion_common::{downcast_value, unwrap_or_internal_err, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Accumulator, AggregateState};
@@ -266,29 +266,31 @@ impl Accumulator for CovarianceAccumulator {
"The two columns are not aligned".to_string(),
));
}
+ } else {
+ let value1 = unwrap_or_internal_err!(value1);
+ let value2 = unwrap_or_internal_err!(value2);
+ let new_count = self.count + 1;
+ let delta1 = value1 - self.mean1;
+ let new_mean1 = delta1 / new_count as f64 + self.mean1;
+ let delta2 = value2 - self.mean2;
+ let new_mean2 = delta2 / new_count as f64 + self.mean2;
+ let new_c = delta1 * (value2 - new_mean2) + self.algo_const;
+
+ self.count += 1;
+ self.mean1 = new_mean1;
+ self.mean2 = new_mean2;
+ self.algo_const = new_c;
}
-
- let new_count = self.count + 1;
- let delta1 = value1.unwrap() - self.mean1;
- let new_mean1 = delta1 / new_count as f64 + self.mean1;
- let delta2 = value2.unwrap() - self.mean2;
- let new_mean2 = delta2 / new_count as f64 + self.mean2;
- let new_c = delta1 * (value2.unwrap() - new_mean2) +
self.algo_const;
-
- self.count += 1;
- self.mean1 = new_mean1;
- self.mean2 = new_mean2;
- self.algo_const = new_c;
}
Ok(())
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
- let means1 =
states[1].as_any().downcast_ref::<Float64Array>().unwrap();
- let means2 =
states[2].as_any().downcast_ref::<Float64Array>().unwrap();
- let cs = states[3].as_any().downcast_ref::<Float64Array>().unwrap();
+ let counts = downcast_value!(states[0], UInt64Array);
+ let means1 = downcast_value!(states[1], Float64Array);
+ let means2 = downcast_value!(states[2], Float64Array);
+ let cs = downcast_value!(states[3], Float64Array);
for i in 0..counts.len() {
let c = counts.value(i);
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs
b/datafusion/physical-expr/src/aggregate/min_max.rs
index 0c7a24a16..3f2b698b8 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -17,7 +17,7 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
-use std::any::Any;
+use std::any::{type_name, Any};
use std::convert::TryFrom;
use std::sync::Arc;
@@ -35,7 +35,7 @@ use arrow::{
datatypes::Field,
};
use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{downcast_value, DataFusionError, Result};
use datafusion_expr::{Accumulator, AggregateState};
use crate::aggregate::row_accumulator::RowAccumulator;
@@ -145,7 +145,7 @@ impl AggregateExpr for Max {
// Statically-typed version of min/max(array) -> ScalarValue for string types.
macro_rules! typed_min_max_batch_string {
($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
- let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ let array = downcast_value!($VALUES, $ARRAYTYPE);
let value = compute::$OP(array);
let value = value.and_then(|e| Some(e.to_string()));
ScalarValue::$SCALAR(value)
@@ -155,13 +155,13 @@ macro_rules! typed_min_max_batch_string {
// Statically-typed version of min/max(array) -> ScalarValue for non-string
types.
macro_rules! typed_min_max_batch {
($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{
- let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ let array = downcast_value!($VALUES, $ARRAYTYPE);
let value = compute::$OP(array);
ScalarValue::$SCALAR(value)
}};
($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident, $TZ:expr) => {{
- let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ let array = downcast_value!($VALUES, $ARRAYTYPE);
let value = compute::$OP(array);
ScalarValue::$SCALAR(value, $TZ.clone())
}};
@@ -176,7 +176,7 @@ macro_rules! typed_min_max_batch_decimal128 {
if null_count == $VALUES.len() {
ScalarValue::Decimal128(None, *$PRECISION, *$SCALE)
} else {
- let array =
$VALUES.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ let array = downcast_value!($VALUES, Decimal128Array);
if null_count == 0 {
// there is no null value
let mut result = array.value(0);
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs
b/datafusion/physical-expr/src/aggregate/mod.rs
index a8d59d714..ec338eb68 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -18,7 +18,7 @@
use crate::aggregate::row_accumulator::RowAccumulator;
use crate::PhysicalExpr;
use arrow::datatypes::Field;
-use datafusion_common::Result;
+use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
use std::fmt::Debug;
@@ -97,6 +97,9 @@ pub trait AggregateExpr: Send + Sync + Debug {
&self,
_start_index: usize,
) -> Result<Box<dyn RowAccumulator>> {
- unreachable!()
+ Err(DataFusionError::NotImplemented(format!(
+ "RowAccumulator hasn't been implemented for {:?} yet",
+ self
+ )))
}
}
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs
b/datafusion/physical-expr/src/aggregate/sum.rs
index 181444b41..0841cb6db 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -17,7 +17,7 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
-use std::any::Any;
+use std::any::{type_name, Any};
use std::convert::TryFrom;
use std::sync::Arc;
@@ -31,7 +31,7 @@ use arrow::{
},
datatypes::Field,
};
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion_expr::{Accumulator, AggregateState};
use crate::aggregate::row_accumulator::RowAccumulator;
@@ -144,7 +144,7 @@ impl SumAccumulator {
// returns the new value after sum with the new values, taking nullability
into account
macro_rules! typed_sum_delta_batch {
($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
- let array = $VALUES.as_any().downcast_ref::<$ARRAYTYPE>().unwrap();
+ let array = downcast_value!($VALUES, $ARRAYTYPE);
let delta = compute::sum(array);
ScalarValue::$SCALAR(delta)
}};
@@ -153,7 +153,7 @@ macro_rules! typed_sum_delta_batch {
// TODO implement this in arrow-rs with simd
// https://github.com/apache/arrow-rs/issues/1010
fn sum_decimal_batch(values: &ArrayRef, precision: u8, scale: u8) ->
Result<ScalarValue> {
- let array = values.as_any().downcast_ref::<Decimal128Array>().unwrap();
+ let array = downcast_value!(values, Decimal128Array);
if array.null_count() == array.len() {
return Ok(ScalarValue::Decimal128(None, precision, scale));
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index d2ab46bdb..5f9738f68 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -171,9 +171,9 @@ impl Accumulator for DistinctSumAccumulator {
fn evaluate(&self) -> Result<ScalarValue> {
let mut sum_value = ScalarValue::try_from(&self.data_type)?;
- self.hash_values.iter().for_each(|distinct_value| {
- sum_value = sum::sum(&sum_value, distinct_value).unwrap()
- });
+ for distinct_value in self.hash_values.iter() {
+ sum_value = sum::sum(&sum_value, distinct_value)?;
+ }
Ok(sum_value)
}
}
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs
b/datafusion/physical-expr/src/aggregate/variance.rs
index 4ff4318e3..dde85f414 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -17,7 +17,7 @@
//! Defines physical expressions that can evaluated at runtime during query
execution
-use std::any::Any;
+use std::any::{type_name, Any};
use std::sync::Arc;
use crate::aggregate::stats::StatsType;
@@ -30,6 +30,7 @@ use arrow::{
datatypes::DataType,
datatypes::Field,
};
+use datafusion_common::downcast_value;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Accumulator, AggregateState};
@@ -220,12 +221,7 @@ impl Accumulator for VarianceAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &cast(&values[0], &DataType::Float64)?;
- let arr = values
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap()
- .iter()
- .flatten();
+ let arr = downcast_value!(values, Float64Array).iter().flatten();
for value in arr {
let new_count = self.count + 1;
@@ -243,9 +239,9 @@ impl Accumulator for VarianceAccumulator {
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
- let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
- let means = states[1].as_any().downcast_ref::<Float64Array>().unwrap();
- let m2s = states[2].as_any().downcast_ref::<Float64Array>().unwrap();
+ let counts = downcast_value!(states[0], UInt64Array);
+ let means = downcast_value!(states[1], Float64Array);
+ let m2s = downcast_value!(states[2], Float64Array);
for i in 0..counts.len() {
let c = counts.value(i);