alamb commented on code in PR #15667: URL: https://github.com/apache/datafusion/pull/15667#discussion_r2072364948
########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs: ########## @@ -193,6 +193,14 @@ pub fn set_nulls_dyn(input: &dyn Array, nulls: Option<NullBuffer>) -> Result<Arr )) } } + DataType::Struct(_) => unsafe { + let input = input.as_struct(); + Arc::new(StructArray::new_unchecked( Review Comment: can you please add a safety note here (probably can just reuse the same as above) ########## datafusion/common/src/scalar/mod.rs: ########## @@ -616,7 +616,20 @@ fn partial_cmp_list(arr1: &dyn Array, arr2: &dyn Array) -> Option<Ordering> { Some(arr1.len().cmp(&arr2.len())) } -fn partial_cmp_struct(s1: &Arc<StructArray>, s2: &Arc<StructArray>) -> Option<Ordering> { +fn expand_struct_columns<'a>(array: &'a StructArray, columns: &mut Vec<&'a ArrayRef>) { Review Comment: I think a more standard name for this operation is "flatten" but since this is an internal function I think it is fine too ########## datafusion/functions-aggregate/src/min_max.rs: ########## @@ -619,6 +625,45 @@ fn min_batch(values: &ArrayRef) -> Result<ScalarValue> { }) } +fn min_max_batch_struct(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> { + if array.len() == array.null_count() { + return ScalarValue::try_from(array.data_type()); + } + let mut extreme = ScalarValue::try_from_array(array, 0)?; + for i in 1..array.len() { + let current = ScalarValue::try_from_array(array, i)?; + if current.is_null() { + continue; + } + if extreme.is_null() { + extreme = current; + continue; + } + if let Some(cmp) = extreme.partial_cmp(¤t) { + if cmp == ordering { + extreme = current; + } + } + } + // use deep_clone to free array reference + Ok(extreme.deep_clone()) +} + +macro_rules! min_max_struct { + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + if $VALUE.is_null() { + $DELTA.clone() + } else if $DELTA.is_null() { + $VALUE.clone() + } else { + match $VALUE.partial_cmp(&$DELTA) { + Some(choose_min_max!($OP)) => $DELTA.clone(), Review Comment: should this also use deep_clone? ########## datafusion/functions-aggregate/src/min_max.rs: ########## @@ -619,6 +625,45 @@ fn min_batch(values: &ArrayRef) -> Result<ScalarValue> { }) } +fn min_max_batch_struct(array: &ArrayRef, ordering: Ordering) -> Result<ScalarValue> { + if array.len() == array.null_count() { + return ScalarValue::try_from(array.data_type()); + } + let mut extreme = ScalarValue::try_from_array(array, 0)?; + for i in 1..array.len() { + let current = ScalarValue::try_from_array(array, i)?; + if current.is_null() { + continue; + } + if extreme.is_null() { + extreme = current; + continue; + } + if let Some(cmp) = extreme.partial_cmp(¤t) { + if cmp == ordering { + extreme = current; + } + } + } + // use deep_clone to free array reference + Ok(extreme.deep_clone()) +} + +macro_rules! min_max_struct { + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + if $VALUE.is_null() { + $DELTA.clone() + } else if $DELTA.is_null() { + $VALUE.clone() + } else { + match $VALUE.partial_cmp(&$DELTA) { + Some(choose_min_max!($OP)) => $DELTA.clone(), + _ => $VALUE.clone(), + } + } + }}; +} + /// dynamically-typed max(array) -> ScalarValue pub fn max_batch(values: &ArrayRef) -> Result<ScalarValue> { Review Comment: I wonder if we could potentially remove the duplicated code in this function by calling into MinAccumulator / max accumulator as necessary. I can't remember why there is a second copy of this code This might be something good to do as a follow on PR ########## datafusion/sqllogictest/test_files/aggregate.slt: ########## @@ -6923,3 +6923,61 @@ select c2, count(*) from test WHERE 1 = 1 group by c2; 4 1 5 1 6 1 + +# Min/Max struct +query ?? rowsort +WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i)) +SELECT MIN(c), MAX(c) FROM (SELECT STRUCT(c1 AS 'a', c2 AS 'b') AS c FROM t) +---- +{a: 1, b: 2} {a: 10, b: 11} + +# Min/Max struct with NULL +query ?? rowsort +WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i)) +SELECT MIN(c), MAX(c) FROM (SELECT CASE WHEN c1 % 2 == 0 THEN STRUCT(c1 AS 'a', c2 AS 'b') ELSE NULL END AS c FROM t) +---- +{a: 2, b: 3} {a: 10, b: 11} + +# Min/Max struct with two recordbatch +query ?? rowsort +SELECT MIN(c), MAX(c) FROM (SELECT STRUCT(1 as 'a', 2 as 'b') AS c UNION SELECT STRUCT(3 as 'a', 4 as 'b') AS c ) +---- +{a: 1, b: 2} {a: 3, b: 4} + +# Min/Max struct empty +query ?? rowsort +SELECT MIN(c), MAX(c) FROM (SELECT * FROM (SELECT STRUCT(1 as 'a', 2 as 'b') AS c) LIMIT 0) +---- +NULL NULL + +# Min/Max group struct +query I?? rowsort +WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i)) +SELECT key, MIN(c), MAX(c) FROM (SELECT STRUCT(c1 AS 'a', c2 AS 'b') AS c, (c1 % 2) AS key FROM t) GROUP BY key +---- +0 {a: 2, b: 3} {a: 10, b: 11} +1 {a: 1, b: 2} {a: 9, b: 10} + +# Min/Max group struct with NULL +query I?? rowsort +WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i)) +SELECT key, MIN(c), MAX(c) FROM (SELECT CASE WHEN c1 % 2 == 0 THEN STRUCT(c1 AS 'a', c2 AS 'b') ELSE NULL END AS c, (c1 % 2) AS key FROM t) GROUP BY key +---- +0 {a: 2, b: 3} {a: 10, b: 11} +1 NULL NULL + +# Min/Max group struct with NULL +query I?? rowsort +WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i)) +SELECT key, MIN(c), MAX(c) FROM (SELECT CASE WHEN c1 % 3 == 0 THEN STRUCT(c1 AS 'a', c2 AS 'b') ELSE NULL END AS c, (c1 % 2) AS key FROM t) GROUP BY key +---- +0 {a: 6, b: 7} {a: 6, b: 7} +1 {a: 3, b: 4} {a: 9, b: 10} + +# Min/Max struct empty +query ?? rowsort +WITH t AS (SELECT i as c1, i + 1 as c2 FROM generate_series(1, 10) t(i)) +SELECT MIN(c), MAX(c) FROM (SELECT STRUCT(c1 AS 'a', c2 AS 'b') AS c, (c1 % 2) AS key FROM t LIMIT 0) GROUP BY key +---- + Review Comment: Can. you please add additional tests for: 1. A struct with just one field (all the tests in this file so far have 2 fields) 2. Structs where the first field is equal, so the comparison has to go to the second field For 2 I am thinking something like this (where the first two fields are equal) ```sql > select min(column1),max(column1) from ( values ({"a":1,"b":2,"c":3}), ({"a":1,"b":2,"c":4}) ); +--------------------+--------------------+ | min(column1) | max(column1) | +--------------------+--------------------+ | {a: 1, b: 2, c: 3} | {a: 1, b: 2, c: 4} | +--------------------+--------------------+ 1 row(s) fetched. Elapsed 0.020 seconds. ``` ########## datafusion/common/src/scalar/mod.rs: ########## @@ -3415,6 +3434,58 @@ impl ScalarValue { .map(|sv| sv.size() - size_of_val(sv)) .sum::<usize>() } + + /// Performs a deep clone of the ScalarValue, creating new copies of all nested data structures. + /// This is different from the standard `clone()` which may share data through `Arc`. + /// Aggregation functions like `max` will cost a lot of memory if the data is not cloned. + pub fn deep_clone(&self) -> Self { Review Comment: I think `deep_clone` is a fine name. Another one might be `force_clone` or something to indicate it is cloning the underlying data (not just array refs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org