This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 49656ec ARROW-4894: [Rust] [DataFusion] Remove all uses of panic!
from aggregate.rs
49656ec is described below
commit 49656ec3511841d307a96e88c0420f344e7e86b4
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 15 12:16:31 2019 -0600
ARROW-4894: [Rust] [DataFusion] Remove all uses of panic! from aggregate.rs
Author: Andy Grove <[email protected]>
Closes #3920 from andygrove/ARROW-4894 and squashes the following commits:
ee41c82 <Andy Grove> Remove all uses of panic! from aggregate.rs
---
rust/datafusion/src/execution/aggregate.rs | 68 +++++++++++++++++++++---------
1 file changed, 49 insertions(+), 19 deletions(-)
diff --git a/rust/datafusion/src/execution/aggregate.rs
b/rust/datafusion/src/execution/aggregate.rs
index befafe5..b81c87b 100644
--- a/rust/datafusion/src/execution/aggregate.rs
+++ b/rust/datafusion/src/execution/aggregate.rs
@@ -82,7 +82,7 @@ enum GroupByScalar {
trait AggregateFunction {
/// Get the function name (used for debugging)
fn name(&self) -> &str;
- fn accumulate_scalar(&mut self, value: &Option<ScalarValue>);
+ fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()>;
fn result(&self) -> &Option<ScalarValue>;
fn data_type(&self) -> &DataType;
}
@@ -107,7 +107,7 @@ impl AggregateFunction for MinFunction {
"min"
}
- fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) {
+ fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()>
{
if self.value.is_none() {
self.value = value.clone();
} else if value.is_some() {
@@ -142,9 +142,14 @@ impl AggregateFunction for MinFunction {
(Some(ScalarValue::Float64(a)), Some(ScalarValue::Float64(b)))
=> {
Some(ScalarValue::Float64(a.min(*b)))
}
- _ => panic!("unsupported data type for MIN"),
+ _ => {
+ return Err(ExecutionError::ExecutionError(
+ "unsupported data type for MIN".to_string(),
+ ));
+ }
}
}
+ Ok(())
}
fn result(&self) -> &Option<ScalarValue> {
@@ -176,7 +181,7 @@ impl AggregateFunction for MaxFunction {
"max"
}
- fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) {
+ fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()>
{
if self.value.is_none() {
self.value = value.clone();
} else if value.is_some() {
@@ -211,9 +216,14 @@ impl AggregateFunction for MaxFunction {
(Some(ScalarValue::Float64(a)), Some(ScalarValue::Float64(b)))
=> {
Some(ScalarValue::Float64(a.max(*b)))
}
- _ => panic!("unsupported data type for MAX"),
+ _ => {
+ return Err(ExecutionError::ExecutionError(
+ "unsupported data type for MAX".to_string(),
+ ));
+ }
}
}
+ Ok(())
}
fn result(&self) -> &Option<ScalarValue> {
@@ -245,7 +255,7 @@ impl AggregateFunction for SumFunction {
"sum"
}
- fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) {
+ fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()>
{
if self.value.is_none() {
self.value = value.clone();
} else if value.is_some() {
@@ -280,9 +290,14 @@ impl AggregateFunction for SumFunction {
(Some(ScalarValue::Float64(a)), Some(ScalarValue::Float64(b)))
=> {
Some(ScalarValue::Float64(a + *b))
}
- _ => panic!("unsupported data type for SUM"),
+ _ => {
+ return Err(ExecutionError::ExecutionError(
+ "unsupported data type for SUM".to_string(),
+ ));
+ }
}
}
+ Ok(())
}
fn result(&self) -> &Option<ScalarValue> {
@@ -299,9 +314,9 @@ struct AccumulatorSet {
}
impl AccumulatorSet {
- fn accumulate_scalar(&mut self, i: usize, value: Option<ScalarValue>) {
+ fn accumulate_scalar(&mut self, i: usize, value: Option<ScalarValue>) ->
Result<()> {
let mut accumulator = self.aggr_values[i].borrow_mut();
- accumulator.accumulate_scalar(&value);
+ accumulator.accumulate_scalar(&value)
}
fn values(&self) -> Vec<Option<ScalarValue>> {
@@ -552,7 +567,7 @@ fn update_accumulators(
row: usize,
accumulator_set: &mut AccumulatorSet,
aggr_expr: &Vec<RuntimeExpr>,
-) {
+) -> Result<()> {
// update the accumulators
for j in 0..accumulator_set.aggr_values.len() {
match &aggr_expr[j] {
@@ -615,16 +630,26 @@ fn update_accumulators(
.unwrap();
Some(ScalarValue::Float64(z.value(row)))
}
- _ => panic!(),
+ other => return
Err(ExecutionError::ExecutionError(format!("Unsupported data type {:?} for
result of aggregate expression", other))),
};
- accumulator_set.accumulate_scalar(j, value);
+ accumulator_set.accumulate_scalar(j, value)?;
+ }
+ Err(e) => {
+ return Err(ExecutionError::ExecutionError(format!(
+ "Failed to evaluate aggregate expression: {:?}",
+ e
+ )));
}
- _ => panic!(),
}
}
- _ => panic!(),
+ _ => {
+ return Err(ExecutionError::InternalError(
+ "Invalid aggregate expression in accumulator".to_string(),
+ ));
+ }
}
}
+ Ok(())
}
impl Relation for AggregateRelation {
@@ -729,11 +754,11 @@ impl AggregateRelation {
match args[0](&batch) {
Ok(array) => match f {
AggregateType::Min => accumulator_set
- .accumulate_scalar(i, array_min(array,
&t)?),
+ .accumulate_scalar(i, array_min(array,
&t)?)?,
AggregateType::Max => accumulator_set
- .accumulate_scalar(i, array_max(array,
&t)?),
+ .accumulate_scalar(i, array_max(array,
&t)?)?,
AggregateType::Sum => accumulator_set
- .accumulate_scalar(i, array_sum(array,
&t)?),
+ .accumulate_scalar(i, array_sum(array,
&t)?)?,
_ => {
return Err(ExecutionError::NotImplemented(
"Unsupported aggregate
function".to_string(),
@@ -893,7 +918,7 @@ impl AggregateRelation {
row,
&mut accumulator_set,
&self.aggr_expr,
- );
+ )?;
true
}
None => false,
@@ -904,7 +929,12 @@ impl AggregateRelation {
Rc::new(RefCell::new(create_accumulators(&self.aggr_expr)?));
{
let mut entry_mut = accumulator_set.borrow_mut();
- update_accumulators(&batch, row, &mut entry_mut,
&self.aggr_expr);
+ update_accumulators(
+ &batch,
+ row,
+ &mut entry_mut,
+ &self.aggr_expr,
+ )?;
}
map.insert(key.clone(), accumulator_set);
}