crepererum commented on code in PR #4924:
URL: https://github.com/apache/arrow-datafusion/pull/4924#discussion_r1072017449


##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -580,3 +724,26 @@ fn read_as_batch(rows: &[Vec<u8>], schema: &Schema, 
row_type: RowType) -> Vec<Ar
 
     output.output_as_columns()
 }
+
+fn get_at_indices(
+    input_values: &[Vec<Arc<dyn Array>>],
+    batch_indices: &PrimitiveArray<UInt32Type>,
+) -> Vec<Vec<Arc<dyn Array>>> {
+    input_values
+        .iter()
+        .map(|array| {
+            array
+                .iter()
+                .map(|array| {
+                    compute::take(
+                        array.as_ref(),
+                        batch_indices,
+                        None, // None: no index check
+                    )
+                    .unwrap()
+                })
+                .collect()
+            // 2.3

Review Comment:
   What's 2.3 here?



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -70,29 +74,33 @@ use hashbrown::raw::RawTable;
 ///
 /// [Compact]: datafusion_row::layout::RowType::Compact
 /// [WordAligned]: datafusion_row::layout::RowType::WordAligned
-pub(crate) struct GroupedHashAggregateStreamV2 {
+pub(crate) struct GroupedHashAggregateStream {
     stream: BoxStream<'static, ArrowResult<RecordBatch>>,
     schema: SchemaRef,
 }
 
-/// Actual implementation of [`GroupedHashAggregateStreamV2`].
+/// Actual implementation of [`GroupedHashAggregateStream`].
 ///
 /// This is wrapped into yet another struct because we need to interact with 
the async memory management subsystem
 /// during poll. To have as little code "weirdness" as possible, we chose to 
just use [`BoxStream`] together with
-/// [`futures::stream::unfold`]. The latter requires a state object, which is 
[`GroupedHashAggregateStreamV2Inner`].
-struct GroupedHashAggregateStreamV2Inner {
+/// [`futures::stream::unfold`]. The latter requires a state object, which is 
[`GroupedHashAggregateStreamInner`].
+struct GroupedHashAggregateStreamInner {
     schema: SchemaRef,
     input: SendableRecordBatchStream,
     mode: AggregateMode,
-    aggr_state: AggregationState,
-    aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    normal_aggr_expr: Vec<Arc<dyn AggregateExpr>>,
+    row_aggr_state: RowAggregationState,
+    /// Aggregate expressions not supporting row accumulation
+    normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,

Review Comment:
   So we have one operator now but still two accumulator implementations? I 
think think this counts as "closing the ticket" though. We still have massive 
code duplication in accumulator implementations. Furthermore the implementation 
of this specific stream gets more complicated, but I agree that this would be 
one possible path forward.



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -332,34 +386,46 @@ fn group_aggregate_batch(
 
                     group_state
                         .indices
-                        .push_accounted(row as u32, &mut allocated); // 
remember this row
+                        .push_accounted(row as u32, &mut row_allocated); // 
remember this row
                 }
                 //  1.2 Need to create new entry
                 None => {
+                    let accumulator_set =
+                        aggregates::create_accumulators(normal_aggr_expr)?;
                     // Add new entry to group_states and save newly created 
index
                     let group_state = RowGroupState {
                         group_by_values: group_rows.row(row).owned(),
                         aggregation_buffer: vec![0; 
state_layout.fixed_part_width()],
+                        accumulator_set,
                         indices: vec![row as u32], // 1.3
                     };
-                    let group_idx = group_states.len();
+                    let group_idx = row_group_states.len();
 
                     // NOTE: do NOT include the `RowGroupState` struct size in 
here because this is captured by
                     // `group_states` (see allocation down below)
-                    allocated += (std::mem::size_of::<u8>()
+                    row_allocated += (std::mem::size_of::<u8>()
                         * group_state.group_by_values.as_ref().len())
                         + (std::mem::size_of::<u8>()
                             * group_state.aggregation_buffer.capacity())
                         + (std::mem::size_of::<u32>() * 
group_state.indices.capacity());
 
+                    // Allocation done by normal accumulators
+                    normal_allocated += (std::mem::size_of::<Box<dyn 
Accumulator>>()

Review Comment:
   Why is there a 2nd memory tracking variable? Also, as far as I can tell, 
this is never used (you only add to it but never read it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to