jayzhan211 commented on code in PR #7893:
URL: https://github.com/apache/arrow-datafusion/pull/7893#discussion_r1448143623
##########
datafusion/physical-expr/src/aggregate/array_agg_ordered.rs:
##########
@@ -224,28 +229,32 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
partition_values.push(self.values.clone());
partition_ordering_values.push(self.ordering_values.clone());
+ assert!(as_list_array(array_agg_values).is_ok());
+ // Convert array to Scalars to sort them easily. Convert back to
array at evaluation.
let array_agg_res =
ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;
-
- for v in array_agg_res.into_iter() {
- partition_values.push(v);
- }
+ partition_values.extend(array_agg_res);
let orderings =
ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
-
for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each
group/partition
let ordering_value =
partition_ordering_rows.into_iter().map(|ordering_row| {
- if let
ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
+ if let ScalarValue::Struct(s) = ordering_row {
+ let mut ordering_columns_per_row = vec![];
+
+ for column in s.columns() {
+ let sv = ScalarValue::try_from_array(column,
0)?;
+ ordering_columns_per_row.push(sv);
+ }
+
Ok(ordering_columns_per_row)
} else {
exec_err!(
Review Comment:
todo: internal_err
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]