This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new efbd1043d4 Refactor ScalarValue::new_primitive to return Result (#7830)
efbd1043d4 is described below
commit efbd1043d427ebf1783d164c90e45aee1766f12e
Author: Eugene Marushchenko <[email protected]>
AuthorDate: Wed Oct 18 00:38:00 2023 +1000
Refactor ScalarValue::new_primitive to return Result (#7830)
Co-authored-by: Evgeny Maruschenko <[email protected]>
---
datafusion/common/src/scalar.rs | 8 ++++----
.../physical-expr/src/aggregate/bit_and_or_xor.rs | 12 ++++++------
datafusion/physical-expr/src/aggregate/median.rs | 6 +++---
datafusion/physical-expr/src/aggregate/sum.rs | 4 ++--
datafusion/physical-expr/src/aggregate/sum_distinct.rs | 17 +++++++++--------
5 files changed, 24 insertions(+), 23 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 2d47b3e314..2c3dd4c5ca 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -744,7 +744,7 @@ macro_rules! eq_array_primitive {
}
impl ScalarValue {
- /// Create a [`ScalarValue`] with the provided value and datatype
+ /// Create a [`Result<ScalarValue>`] with the provided value and datatype
///
/// # Panics
///
@@ -752,13 +752,13 @@ impl ScalarValue {
pub fn new_primitive<T: ArrowPrimitiveType>(
a: Option<T::Native>,
d: &DataType,
- ) -> Self {
+ ) -> Result<Self> {
match a {
- None => d.try_into().unwrap(),
+ None => d.try_into(),
Some(v) => {
let array = PrimitiveArray::<T>::new(vec![v].into(), None)
.with_data_type(d.clone());
- Self::try_from_array(&array, 0).unwrap()
+ Self::try_from_array(&array, 0)
}
}
}
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
index d7934e79c3..6c97d62061 100644
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
@@ -195,7 +195,7 @@ where
}
fn evaluate(&self) -> Result<ScalarValue> {
- Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
+ ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}
fn size(&self) -> usize {
@@ -356,7 +356,7 @@ where
}
fn evaluate(&self) -> Result<ScalarValue> {
- Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
+ ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}
fn size(&self) -> usize {
@@ -517,7 +517,7 @@ where
}
fn evaluate(&self) -> Result<ScalarValue> {
- Ok(ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE))
+ ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}
fn size(&self) -> usize {
@@ -638,11 +638,11 @@ where
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
let state_out = {
- let values: Vec<ScalarValue> = self
+ let values = self
.values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x),
&T::DATA_TYPE))
- .collect();
+ .collect::<Result<Vec<_>>>()?;
let arr = ScalarValue::new_list(&values, &T::DATA_TYPE);
vec![ScalarValue::List(arr)]
@@ -685,7 +685,7 @@ where
acc = acc ^ *distinct_value;
}
let v = (!self.values.is_empty()).then_some(acc);
- Ok(ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE))
+ ScalarValue::new_primitive::<T>(v, &T::DATA_TYPE)
}
fn size(&self) -> usize {
diff --git a/datafusion/physical-expr/src/aggregate/median.rs
b/datafusion/physical-expr/src/aggregate/median.rs
index 477dcadcee..691b1c1752 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -146,11 +146,11 @@ impl<T: ArrowNumericType> std::fmt::Debug for
MedianAccumulator<T> {
impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
fn state(&self) -> Result<Vec<ScalarValue>> {
- let all_values: Vec<ScalarValue> = self
+ let all_values = self
.all_values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x),
&self.data_type))
- .collect();
+ .collect::<Result<Vec<_>>>()?;
let arr = ScalarValue::new_list(&all_values, &self.data_type);
Ok(vec![ScalarValue::List(arr)])
@@ -188,7 +188,7 @@ impl<T: ArrowNumericType> Accumulator for
MedianAccumulator<T> {
let (_, median, _) = d.select_nth_unstable_by(len / 2, cmp);
Some(*median)
};
- Ok(ScalarValue::new_primitive::<T>(median, &self.data_type))
+ ScalarValue::new_primitive::<T>(median, &self.data_type)
}
fn size(&self) -> usize {
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs
b/datafusion/physical-expr/src/aggregate/sum.rs
index 5cc8e93332..d6c23d0dfa 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -205,7 +205,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T>
{
}
fn evaluate(&self) -> Result<ScalarValue> {
- Ok(ScalarValue::new_primitive::<T>(self.sum, &self.data_type))
+ ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
}
fn size(&self) -> usize {
@@ -265,7 +265,7 @@ impl<T: ArrowNumericType> Accumulator for
SlidingSumAccumulator<T> {
fn evaluate(&self) -> Result<ScalarValue> {
let v = (self.count != 0).then_some(self.sum);
- Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
+ ScalarValue::new_primitive::<T>(v, &self.data_type)
}
fn size(&self) -> usize {
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 742e24b99e..ef1bd039a5 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -159,13 +159,14 @@ impl<T: ArrowPrimitiveType> Accumulator for
DistinctSumAccumulator<T> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
let state_out = {
- let mut distinct_values = Vec::new();
- self.values.iter().for_each(|distinct_value| {
- distinct_values.push(ScalarValue::new_primitive::<T>(
- Some(distinct_value.0),
- &self.data_type,
- ))
- });
+ let distinct_values = self
+ .values
+ .iter()
+ .map(|value| {
+ ScalarValue::new_primitive::<T>(Some(value.0),
&self.data_type)
+ })
+ .collect::<Result<Vec<_>>>()?;
+
vec![ScalarValue::List(ScalarValue::new_list(
&distinct_values,
&self.data_type,
@@ -206,7 +207,7 @@ impl<T: ArrowPrimitiveType> Accumulator for
DistinctSumAccumulator<T> {
acc = acc.add_wrapping(distinct_value.0)
}
let v = (!self.values.is_empty()).then_some(acc);
- Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
+ ScalarValue::new_primitive::<T>(v, &self.data_type)
}
fn size(&self) -> usize {