NGA-TRAN commented on a change in pull request #808:
URL: https://github.com/apache/arrow-datafusion/pull/808#discussion_r685543182



##########
File path: datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -779,8 +553,47 @@ impl GroupedHashAggregateStream {
 }
 
 type AccumulatorItem = Box<dyn Accumulator>;
-type Accumulators =
-    HashMap<Vec<u8>, (Box<[ScalarValue]>, Vec<AccumulatorItem>, Vec<u32>), 
RandomState>;
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct GroupState {
+    /// The actual group by values, one for each group column
+    group_by_values: Box<[ScalarValue]>,
+
+    // Accumulator state, one for each aggregate
+    accumulator_set: Vec<AccumulatorItem>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    indices: Vec<u32>,
+}

Review comment:
       Nice

##########
File path: datafusion/src/scalar.rs
##########
@@ -973,22 +1011,106 @@ impl ScalarValue {
         })
     }
 
-    fn try_from_dict_array<K: ArrowDictionaryKeyType>(
+    /// Compares array @ index for equality with self, in an optimized fashion
+    ///
+    /// This method implements an optimized version of:
+    ///
+    /// ``text
+    ///     let arr_scalar = Self::try_from_array(array, index).unwrap();
+    ///     arr_scalar.eq(self)
+    /// ```
+    #[inline]
+    pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool {
+        if let DataType::Dictionary(key_type, _) = array.data_type() {
+            return self.eq_array_dictionary(array, index, key_type);
+        }
+
+        match self {
+            ScalarValue::Boolean(val) => {
+                eq_array_primitive!(array, index, BooleanArray, val)
+            }
+            ScalarValue::Float32(val) => {
+                eq_array_primitive!(array, index, Float32Array, val)
+            }
+            ScalarValue::Float64(val) => {
+                eq_array_primitive!(array, index, Float64Array, val)
+            }
+            ScalarValue::Int8(val) => eq_array_primitive!(array, index, 
Int8Array, val),
+            ScalarValue::Int16(val) => eq_array_primitive!(array, index, 
Int16Array, val),
+            ScalarValue::Int32(val) => eq_array_primitive!(array, index, 
Int32Array, val),
+            ScalarValue::Int64(val) => eq_array_primitive!(array, index, 
Int64Array, val),
+            ScalarValue::UInt8(val) => eq_array_primitive!(array, index, 
UInt8Array, val),
+            ScalarValue::UInt16(val) => {
+                eq_array_primitive!(array, index, UInt16Array, val)
+            }
+            ScalarValue::UInt32(val) => {
+                eq_array_primitive!(array, index, UInt32Array, val)
+            }
+            ScalarValue::UInt64(val) => {
+                eq_array_primitive!(array, index, UInt64Array, val)
+            }
+            ScalarValue::Utf8(val) => eq_array_primitive!(array, index, 
StringArray, val),
+            ScalarValue::LargeUtf8(val) => {
+                eq_array_primitive!(array, index, LargeStringArray, val)
+            }
+            ScalarValue::Binary(val) => {
+                eq_array_primitive!(array, index, BinaryArray, val)
+            }
+            ScalarValue::LargeBinary(val) => {
+                eq_array_primitive!(array, index, LargeBinaryArray, val)
+            }
+            ScalarValue::List(_, _) => unimplemented!(),
+            ScalarValue::Date32(val) => {
+                eq_array_primitive!(array, index, Date32Array, val)
+            }
+            ScalarValue::Date64(val) => {
+                eq_array_primitive!(array, index, Date64Array, val)
+            }
+            ScalarValue::TimestampSecond(val) => {
+                eq_array_primitive!(array, index, TimestampSecondArray, val)
+            }
+            ScalarValue::TimestampMillisecond(val) => {
+                eq_array_primitive!(array, index, TimestampMillisecondArray, 
val)
+            }
+            ScalarValue::TimestampMicrosecond(val) => {
+                eq_array_primitive!(array, index, TimestampMicrosecondArray, 
val)
+            }
+            ScalarValue::TimestampNanosecond(val) => {
+                eq_array_primitive!(array, index, TimestampNanosecondArray, 
val)
+            }
+            ScalarValue::IntervalYearMonth(val) => {
+                eq_array_primitive!(array, index, IntervalYearMonthArray, val)
+            }
+            ScalarValue::IntervalDayTime(val) => {
+                eq_array_primitive!(array, index, IntervalDayTimeArray, val)
+            }
+        }
+    }

Review comment:
       Nice

##########
File path: datafusion/src/scalar.rs
##########
@@ -1654,6 +1776,159 @@ mod tests {
         assert_eq!(std::mem::size_of::<ScalarValue>(), 32);
     }
 
+    #[test]
+    fn scalar_eq_array() {
+        // Validate that eq_array has the same semantics as ScalarValue::eq
+        macro_rules! make_typed_vec {
+            ($INPUT:expr, $TYPE:ident) => {{
+                $INPUT
+                    .iter()
+                    .map(|v| v.map(|v| v as $TYPE))
+                    .collect::<Vec<_>>()
+            }};
+        }
+
+        let bool_vals = vec![Some(true), None, Some(false)];
+        let f32_vals = vec![Some(-1.0), None, Some(1.0)];
+        let f64_vals = make_typed_vec!(f32_vals, f64);
+
+        let i8_vals = vec![Some(-1), None, Some(1)];
+        let i16_vals = make_typed_vec!(i8_vals, i16);
+        let i32_vals = make_typed_vec!(i8_vals, i32);
+        let i64_vals = make_typed_vec!(i8_vals, i64);
+
+        let u8_vals = vec![Some(0), None, Some(1)];
+        let u16_vals = make_typed_vec!(u8_vals, u16);
+        let u32_vals = make_typed_vec!(u8_vals, u32);
+        let u64_vals = make_typed_vec!(u8_vals, u64);
+
+        let str_vals = vec![Some("foo"), None, Some("bar")];

Review comment:
       I wonder why all the second value is always NULL? Will it be more 
general to have it random (first or third)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to