This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 49656ec  ARROW-4894: [Rust] [DataFusion] Remove all uses of panic! 
from aggregate.rs
49656ec is described below

commit 49656ec3511841d307a96e88c0420f344e7e86b4
Author: Andy Grove <[email protected]>
AuthorDate: Fri Mar 15 12:16:31 2019 -0600

    ARROW-4894: [Rust] [DataFusion] Remove all uses of panic! from aggregate.rs
    
    Author: Andy Grove <[email protected]>
    
    Closes #3920 from andygrove/ARROW-4894 and squashes the following commits:
    
    ee41c82 <Andy Grove> Remove all uses of panic! from aggregate.rs
---
 rust/datafusion/src/execution/aggregate.rs | 68 +++++++++++++++++++++---------
 1 file changed, 49 insertions(+), 19 deletions(-)

diff --git a/rust/datafusion/src/execution/aggregate.rs 
b/rust/datafusion/src/execution/aggregate.rs
index befafe5..b81c87b 100644
--- a/rust/datafusion/src/execution/aggregate.rs
+++ b/rust/datafusion/src/execution/aggregate.rs
@@ -82,7 +82,7 @@ enum GroupByScalar {
 trait AggregateFunction {
     /// Get the function name (used for debugging)
     fn name(&self) -> &str;
-    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>);
+    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()>;
     fn result(&self) -> &Option<ScalarValue>;
     fn data_type(&self) -> &DataType;
 }
@@ -107,7 +107,7 @@ impl AggregateFunction for MinFunction {
         "min"
     }
 
-    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) {
+    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()> 
{
         if self.value.is_none() {
             self.value = value.clone();
         } else if value.is_some() {
@@ -142,9 +142,14 @@ impl AggregateFunction for MinFunction {
                 (Some(ScalarValue::Float64(a)), Some(ScalarValue::Float64(b))) 
=> {
                     Some(ScalarValue::Float64(a.min(*b)))
                 }
-                _ => panic!("unsupported data type for MIN"),
+                _ => {
+                    return Err(ExecutionError::ExecutionError(
+                        "unsupported data type for MIN".to_string(),
+                    ));
+                }
             }
         }
+        Ok(())
     }
 
     fn result(&self) -> &Option<ScalarValue> {
@@ -176,7 +181,7 @@ impl AggregateFunction for MaxFunction {
         "max"
     }
 
-    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) {
+    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()> 
{
         if self.value.is_none() {
             self.value = value.clone();
         } else if value.is_some() {
@@ -211,9 +216,14 @@ impl AggregateFunction for MaxFunction {
                 (Some(ScalarValue::Float64(a)), Some(ScalarValue::Float64(b))) 
=> {
                     Some(ScalarValue::Float64(a.max(*b)))
                 }
-                _ => panic!("unsupported data type for MAX"),
+                _ => {
+                    return Err(ExecutionError::ExecutionError(
+                        "unsupported data type for MAX".to_string(),
+                    ));
+                }
             }
         }
+        Ok(())
     }
 
     fn result(&self) -> &Option<ScalarValue> {
@@ -245,7 +255,7 @@ impl AggregateFunction for SumFunction {
         "sum"
     }
 
-    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) {
+    fn accumulate_scalar(&mut self, value: &Option<ScalarValue>) -> Result<()> 
{
         if self.value.is_none() {
             self.value = value.clone();
         } else if value.is_some() {
@@ -280,9 +290,14 @@ impl AggregateFunction for SumFunction {
                 (Some(ScalarValue::Float64(a)), Some(ScalarValue::Float64(b))) 
=> {
                     Some(ScalarValue::Float64(a + *b))
                 }
-                _ => panic!("unsupported data type for SUM"),
+                _ => {
+                    return Err(ExecutionError::ExecutionError(
+                        "unsupported data type for SUM".to_string(),
+                    ));
+                }
             }
         }
+        Ok(())
     }
 
     fn result(&self) -> &Option<ScalarValue> {
@@ -299,9 +314,9 @@ struct AccumulatorSet {
 }
 
 impl AccumulatorSet {
-    fn accumulate_scalar(&mut self, i: usize, value: Option<ScalarValue>) {
+    fn accumulate_scalar(&mut self, i: usize, value: Option<ScalarValue>) -> 
Result<()> {
         let mut accumulator = self.aggr_values[i].borrow_mut();
-        accumulator.accumulate_scalar(&value);
+        accumulator.accumulate_scalar(&value)
     }
 
     fn values(&self) -> Vec<Option<ScalarValue>> {
@@ -552,7 +567,7 @@ fn update_accumulators(
     row: usize,
     accumulator_set: &mut AccumulatorSet,
     aggr_expr: &Vec<RuntimeExpr>,
-) {
+) -> Result<()> {
     // update the accumulators
     for j in 0..accumulator_set.aggr_values.len() {
         match &aggr_expr[j] {
@@ -615,16 +630,26 @@ fn update_accumulators(
                                     .unwrap();
                                 Some(ScalarValue::Float64(z.value(row)))
                             }
-                            _ => panic!(),
+                            other => return 
Err(ExecutionError::ExecutionError(format!("Unsupported data type {:?} for 
result of aggregate expression", other))),
                         };
-                        accumulator_set.accumulate_scalar(j, value);
+                        accumulator_set.accumulate_scalar(j, value)?;
+                    }
+                    Err(e) => {
+                        return Err(ExecutionError::ExecutionError(format!(
+                            "Failed to evaluate aggregate expression: {:?}",
+                            e
+                        )));
                     }
-                    _ => panic!(),
                 }
             }
-            _ => panic!(),
+            _ => {
+                return Err(ExecutionError::InternalError(
+                    "Invalid aggregate expression in accumulator".to_string(),
+                ));
+            }
         }
     }
+    Ok(())
 }
 
 impl Relation for AggregateRelation {
@@ -729,11 +754,11 @@ impl AggregateRelation {
                         match args[0](&batch) {
                             Ok(array) => match f {
                                 AggregateType::Min => accumulator_set
-                                    .accumulate_scalar(i, array_min(array, 
&t)?),
+                                    .accumulate_scalar(i, array_min(array, 
&t)?)?,
                                 AggregateType::Max => accumulator_set
-                                    .accumulate_scalar(i, array_max(array, 
&t)?),
+                                    .accumulate_scalar(i, array_max(array, 
&t)?)?,
                                 AggregateType::Sum => accumulator_set
-                                    .accumulate_scalar(i, array_sum(array, 
&t)?),
+                                    .accumulate_scalar(i, array_sum(array, 
&t)?)?,
                                 _ => {
                                     return Err(ExecutionError::NotImplemented(
                                         "Unsupported aggregate 
function".to_string(),
@@ -893,7 +918,7 @@ impl AggregateRelation {
                             row,
                             &mut accumulator_set,
                             &self.aggr_expr,
-                        );
+                        )?;
                         true
                     }
                     None => false,
@@ -904,7 +929,12 @@ impl AggregateRelation {
                         
Rc::new(RefCell::new(create_accumulators(&self.aggr_expr)?));
                     {
                         let mut entry_mut = accumulator_set.borrow_mut();
-                        update_accumulators(&batch, row, &mut entry_mut, 
&self.aggr_expr);
+                        update_accumulators(
+                            &batch,
+                            row,
+                            &mut entry_mut,
+                            &self.aggr_expr,
+                        )?;
                     }
                     map.insert(key.clone(), accumulator_set);
                 }

Reply via email to