alamb commented on code in PR #7629:
URL: https://github.com/apache/arrow-datafusion/pull/7629#discussion_r1352868464
##########
datafusion/common/src/utils.rs:
##########
@@ -334,6 +336,18 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
count
}
+/// Wrap an array into a ListArray
Review Comment:
```suggestion
/// Wrap an array into a single element `ListArray` . For example `[1, 2,
3]`
/// would be converted into `[[1, 2, 3]]`
///
```
##########
datafusion/common/src/scalar.rs:
##########
@@ -316,14 +318,8 @@ impl PartialOrd for ScalarValue {
}
}
(Fixedsizelist(_, _, _), _) => None,
- (List(v1, t1), List(v2, t2)) => {
- if t1.eq(t2) {
- v1.partial_cmp(v2)
- } else {
- None
- }
- }
- (List(_, _), _) => None,
+ (List(_), List(_)) => todo!("ArrayRef does not have PartialOrd
yet"),
Review Comment:
Perhaps this can call one of the comparison kernels, like `lt`:
https://docs.rs/arrow/latest/arrow/compute/kernels/cmp/fn.lt.html
##########
datafusion/common/src/scalar.rs:
##########
@@ -1653,42 +1619,66 @@ impl ScalarValue {
Ok(array)
}
- fn iter_to_array_list(
- scalars: impl IntoIterator<Item = ScalarValue>,
+ // This function does not contains nulls but empty array instead.
Review Comment:
what does "function not contain nulls" mean?
##########
datafusion/common/src/scalar.rs:
##########
@@ -1750,6 +1741,83 @@ impl ScalarValue {
.unwrap()
}
+ /// Converts `Vec<ScalaValue>` to ListArray, simplified version of
ScalarValue::to_array
Review Comment:
👍 ❤️
##########
datafusion/common/src/scalar.rs:
##########
@@ -95,8 +97,8 @@ pub enum ScalarValue {
LargeBinary(Option<Vec<u8>>),
/// Fixed size list of nested ScalarValue
Fixedsizelist(Option<Vec<ScalarValue>>, FieldRef, i32),
- /// List of nested ScalarValue
- List(Option<Vec<ScalarValue>>, FieldRef),
+ /// ListArray wrapper
Review Comment:
```suggestion
/// Represents a single element of a [`ListArray`] as an [`ArrayRef`]
```
##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -252,109 +248,107 @@ mod tests {
accum1.update_batch(&[input1])?;
accum2.update_batch(&[input2])?;
- let state = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
- accum1.merge_batch(&state)?;
+ let array = accum2.state()?[0].raw_data()?;
+ accum1.merge_batch(&[array])?;
let actual = accum1.evaluate()?;
- compare_list_contents(expected, actual)
+ compare_list_contents(expected_values, actual)
+ }
+
+ // Since we dont have a way to sort Array easily, we just check all the
possible outputs.
Review Comment:
Perhaps we can use
[arrow](https://docs.rs/arrow/latest/arrow/index.html)::[compute](https://docs.rs/arrow/latest/arrow/compute/index.html)::[kernels](https://docs.rs/arrow/latest/arrow/compute/kernels/index.html)::[sort](https://docs.rs/arrow/latest/arrow/compute/kernels/sort/index.html)::[lexsort](https://docs.rs/arrow/latest/arrow/compute/kernels/sort/fn.lexsort.html#)
?
##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -637,13 +638,14 @@ where
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
let state_out = {
- let values = self
+ let values: Vec<ScalarValue> = self
.values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x),
&T::DATA_TYPE))
.collect();
- vec![ScalarValue::new_list(Some(values), T::DATA_TYPE)]
+ let arr = ScalarValue::scalars_to_list_array(&values,
&T::DATA_TYPE);
Review Comment:
What do you think about keeping `ScalarValue::new_list(values)` as the API?
It seems easier to use than `scalars_to_list_array` 🤔
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -282,39 +283,40 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
impl OrderSensitiveArrayAggAccumulator {
fn convert_array_agg_to_orderings(
&self,
- in_data: ScalarValue,
- ) -> Result<Vec<Vec<ScalarValue>>> {
- if let ScalarValue::List(Some(list_vals), _field_ref) = in_data {
- list_vals.into_iter().map(|struct_vals| {
- if let ScalarValue::Struct(Some(orderings), _fields) =
struct_vals {
- Ok(orderings)
- } else {
- exec_err!(
- "Expects to receive ScalarValue::Struct(Some(..), _)
but got:{:?}",
- struct_vals.data_type()
- )
- }
- }).collect::<Result<Vec<_>>>()
- } else {
- exec_err!(
- "Expects to receive ScalarValue::List(Some(..), _) but
got:{:?}",
- in_data.data_type()
- )
+ array_agg: Vec<Vec<ScalarValue>>,
+ ) -> Result<Vec<Vec<Vec<ScalarValue>>>> {
Review Comment:
Would it be possible to document what the three levels of Vec mean
semantically ?
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -424,59 +427,6 @@ fn scalar_values_error_serialization() {
Some(vec![]),
vec![Field::new("item", DataType::Int16, true)].into(),
),
- // Should fail due to inconsistent types in the list
Review Comment:
why were these tests removed? Do they pass now? If so, perhaps we can move
them to the positive cases?
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -602,32 +551,6 @@ fn round_trip_scalar_values() {
i64::MAX,
))),
ScalarValue::IntervalMonthDayNano(None),
- ScalarValue::new_list(
Review Comment:
Perhaps we can add a comment here about "see `round_trip_scalar_list` for
ScalarValue::List` tests
Though I don't understand why we need a separate test for ListArray / Handle
. It seems like it would be easier to understand the ListArray coverage if we
kept the test structure as is 🤔
##########
datafusion/common/src/scalar.rs:
##########
@@ -1750,6 +1741,83 @@ impl ScalarValue {
.unwrap()
}
+ /// Converts `Vec<ScalaValue>` to ListArray, simplified version of
ScalarValue::to_array
Review Comment:
👍 ❤️
##########
datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs:
##########
@@ -392,8 +395,11 @@ impl<'a> ConstEvaluator<'a> {
"Could not evaluate the expression, found a result of
length {}",
a.len()
)
+ } else if as_list_array(&a).is_ok() ||
as_large_list_array(&a).is_ok() {
+ Ok(ScalarValue::List(a))
Review Comment:
👍 nice
##########
datafusion/common/src/scalar.rs:
##########
@@ -1872,35 +1940,12 @@ impl ScalarValue {
ScalarValue::Fixedsizelist(..) => {
unimplemented!("FixedSizeList is not supported yet")
}
- ScalarValue::List(values, field) => Arc::new(match
field.data_type() {
- DataType::Boolean => build_list!(BooleanBuilder, Boolean,
values, size),
- DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
- DataType::Int16 => build_list!(Int16Builder, Int16, values,
size),
- DataType::Int32 => build_list!(Int32Builder, Int32, values,
size),
- DataType::Int64 => build_list!(Int64Builder, Int64, values,
size),
- DataType::UInt8 => build_list!(UInt8Builder, UInt8, values,
size),
- DataType::UInt16 => build_list!(UInt16Builder, UInt16, values,
size),
- DataType::UInt32 => build_list!(UInt32Builder, UInt32, values,
size),
- DataType::UInt64 => build_list!(UInt64Builder, UInt64, values,
size),
- DataType::Utf8 => build_list!(StringBuilder, Utf8, values,
size),
- DataType::Float32 => build_list!(Float32Builder, Float32,
values, size),
- DataType::Float64 => build_list!(Float64Builder, Float64,
values, size),
- DataType::Timestamp(unit, tz) => {
- build_timestamp_list!(unit.clone(), tz.clone(), values,
size)
- }
- &DataType::LargeUtf8 => {
- build_list!(LargeStringBuilder, LargeUtf8, values, size)
- }
- _ => ScalarValue::iter_to_array_list(
- repeat(self.clone()).take(size),
- &DataType::List(Arc::new(Field::new(
- "item",
- field.data_type().clone(),
- true,
- ))),
- )
- .unwrap(),
- }),
+ ScalarValue::List(arr) => {
+ let arrays = std::iter::repeat(arr.as_ref())
+ .take(size)
+ .collect::<Vec<_>>();
+ arrow::compute::concat(arrays.as_slice()).unwrap()
Review Comment:
I thought that `concat` output the same type as the input array -- shouldn't
we be creating a `ListArray` here instead 🤔
##########
datafusion/common/src/scalar.rs:
##########
@@ -1872,35 +1940,12 @@ impl ScalarValue {
ScalarValue::Fixedsizelist(..) => {
unimplemented!("FixedSizeList is not supported yet")
}
- ScalarValue::List(values, field) => Arc::new(match
field.data_type() {
- DataType::Boolean => build_list!(BooleanBuilder, Boolean,
values, size),
- DataType::Int8 => build_list!(Int8Builder, Int8, values, size),
- DataType::Int16 => build_list!(Int16Builder, Int16, values,
size),
- DataType::Int32 => build_list!(Int32Builder, Int32, values,
size),
- DataType::Int64 => build_list!(Int64Builder, Int64, values,
size),
- DataType::UInt8 => build_list!(UInt8Builder, UInt8, values,
size),
- DataType::UInt16 => build_list!(UInt16Builder, UInt16, values,
size),
- DataType::UInt32 => build_list!(UInt32Builder, UInt32, values,
size),
- DataType::UInt64 => build_list!(UInt64Builder, UInt64, values,
size),
- DataType::Utf8 => build_list!(StringBuilder, Utf8, values,
size),
- DataType::Float32 => build_list!(Float32Builder, Float32,
values, size),
- DataType::Float64 => build_list!(Float64Builder, Float64,
values, size),
- DataType::Timestamp(unit, tz) => {
- build_timestamp_list!(unit.clone(), tz.clone(), values,
size)
- }
- &DataType::LargeUtf8 => {
- build_list!(LargeStringBuilder, LargeUtf8, values, size)
- }
- _ => ScalarValue::iter_to_array_list(
- repeat(self.clone()).take(size),
- &DataType::List(Arc::new(Field::new(
- "item",
- field.data_type().clone(),
- true,
- ))),
- )
- .unwrap(),
- }),
+ ScalarValue::List(arr) => {
+ let arrays = std::iter::repeat(arr.as_ref())
+ .take(size)
+ .collect::<Vec<_>>();
+ arrow::compute::concat(arrays.as_slice()).unwrap()
Review Comment:
I thought that `concat` output the same type as the input array -- shouldn't
we be creating a `ListArray` here instead 🤔
##########
datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs:
##########
@@ -392,8 +395,11 @@ impl<'a> ConstEvaluator<'a> {
"Could not evaluate the expression, found a result of
length {}",
a.len()
)
+ } else if as_list_array(&a).is_ok() ||
as_large_list_array(&a).is_ok() {
+ Ok(ScalarValue::List(a))
Review Comment:
👍 nice
##########
datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs:
##########
@@ -637,13 +638,14 @@ where
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
let state_out = {
- let values = self
+ let values: Vec<ScalarValue> = self
.values
.iter()
.map(|x| ScalarValue::new_primitive::<T>(Some(*x),
&T::DATA_TYPE))
.collect();
- vec![ScalarValue::new_list(Some(values), T::DATA_TYPE)]
+ let arr = ScalarValue::scalars_to_list_array(&values,
&T::DATA_TYPE);
Review Comment:
What do you think about keeping `ScalarValue::new_list(values)` as the API?
It seems easier to use than `scalars_to_list_array` 🤔
##########
datafusion/common/src/scalar.rs:
##########
@@ -2878,6 +2984,10 @@ impl fmt::Display for ScalarValue {
)?,
None => write!(f, "NULL")?,
},
+ // Array does not implement Display
+ ScalarValue::List(arr) => {
+ write!(f, "{:?}", arr)?;
Review Comment:
What about using
[arrow](https://docs.rs/arrow/latest/arrow/index.html)::[util](https://docs.rs/arrow/latest/arrow/util/index.html)::[pretty](https://docs.rs/arrow/latest/arrow/util/pretty/index.html)::[pretty_format_columns](https://docs.rs/arrow/latest/arrow/util/pretty/fn.pretty_format_columns.html#)
?
##########
datafusion/common/src/scalar.rs:
##########
@@ -2093,18 +2203,29 @@ impl ScalarValue {
DataType::Utf8 => typed_cast!(array, index, StringArray, Utf8),
DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray,
LargeUtf8),
DataType::List(nested_type) => {
- let list_array = as_list_array(array)?;
- let value = match list_array.is_null(index) {
- true => None,
+ let list_array = as_list_array(array);
+ let arr = match list_array.is_null(index) {
+ true => new_null_array(nested_type.data_type(), 0),
false => {
let nested_array = list_array.value(index);
- let scalar_vec = (0..nested_array.len())
- .map(|i|
ScalarValue::try_from_array(&nested_array, i))
- .collect::<Result<Vec<_>>>()?;
- Some(scalar_vec)
+ Arc::new(wrap_into_list_array(nested_array))
}
};
- ScalarValue::new_list(value, nested_type.data_type().clone())
+
+ ScalarValue::List(arr)
+ }
+ // TODO: There is no test for FixedSizeList now, add it later
Review Comment:
Is this something you plan to do in this PR?
##########
datafusion/physical-expr/src/aggregate/array_agg_distinct.rs:
##########
@@ -125,22 +125,18 @@ impl DistinctArrayAggAccumulator {
impl Accumulator for DistinctArrayAggAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
- Ok(vec![ScalarValue::new_list(
- Some(self.values.clone().into_iter().collect()),
- self.datatype.clone(),
- )])
+ Ok(vec![self.evaluate()?])
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
assert_eq!(values.len(), 1, "batch input should only include 1
column!");
let array = &values[0];
- (0..array.len()).try_for_each(|i| {
- if !array.is_null(i) {
- self.values.insert(ScalarValue::try_from_array(array, i)?);
- }
- Ok(())
- })
+ let scalars = ScalarValue::convert_array_to_scalar_vec(array)?;
Review Comment:
this is a very nice improvement
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -602,32 +551,6 @@ fn round_trip_scalar_values() {
i64::MAX,
))),
ScalarValue::IntervalMonthDayNano(None),
- ScalarValue::new_list(
Review Comment:
Perhaps we can add a comment here about "see `round_trip_scalar_list` for
ScalarValue::List` tests
Though I don't understand why we need a separate test for ListArray / Handle
. It seems like it would be easier to understand the ListArray coverage if we
kept the test structure as is 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]