alamb commented on code in PR #6932:
URL: https://github.com/apache/arrow-datafusion/pull/6932#discussion_r1267869895


##########
datafusion/core/src/physical_plan/aggregates/order/partial.rs:
##########
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_expr::EmitTo;
+use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+/// Tracks grouping state when the data is ordered by some subset of
+/// the group keys.
+///
+/// Once the next *sort key* value is seen, never see groups with that
+/// sort key again, so we can emit all groups with the previous sort
+/// key and earlier.
+///
+/// For example, given `SUM(amt) GROUP BY id, state` if the input is
+/// sorted by `state, when a new value of `state` is seen, all groups
+/// with prior values of `state` can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///                                            ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓
+///     ┌─────┐    ┌───────────────────┐ ┌─────┃        9        ┃ ┃ "MD"  ┃
+///     │┌───┐│    │ ┌──────────────┐  │ │     ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
+///     ││ 0 ││    │ │  123, "MA"   │  │ │        current_sort      sort_key
+///     │└───┘│    │ └──────────────┘  │ │
+///     │ ... │    │    ...            │ │      current_sort tracks the most

Review Comment:
   Agreed -- changed to 'smallest' in ddaf0e7fe



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -320,36 +349,36 @@ impl Stream for GroupedHashAggregateStream {
                         // new batch to aggregate
                         Some(Ok(batch)) => {
                             let timer = elapsed_compute.timer();
-                            let result = self.group_aggregate_batch(batch);
-                            timer.done();
-
-                            // allocate memory AFTER we actually used
-                            // the memory, which simplifies the whole
-                            // accounting and we are OK with
-                            // overshooting a bit.
-                            //
-                            // Also this means we either store the
-                            // whole record batch or not.
-                            let result = result.and_then(|allocated| {
-                                self.reservation.try_grow(allocated)
-                            });
-
-                            if let Err(e) = result {
-                                return Poll::Ready(Some(Err(e)));
+                            // Do the grouping
+                            extract_ok!(self.group_aggregate_batch(batch));
+
+                            // If we can begin emitting rows, do so,
+                            // otherwise keep consuming input
+                            let to_emit = if self.input_done {

Review Comment:
   I agree -- `input_done` can't be true here as we just got a batch. I will 
change it to an assert. 372196ec7



##########
datafusion/execution/src/memory_pool/mod.rs:
##########
@@ -221,6 +219,49 @@ pub fn human_readable_size(size: usize) -> String {
     format!("{value:.1} {unit}")
 }
 
+/// Tracks the change in memory to avoid overflow. Typically, this
+/// is isued like the following
+///
+/// 1. Call `delta.dec(sized_thing.size())`
+///
+/// 2. potentially change size of `sized_thing`
+///
+/// 3. Call `delta.inc(size_thing.size())`
+#[derive(Debug, Default)]
+pub struct MemoryDelta {

Review Comment:
   Comments in e7dc2ae8c



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -519,40 +556,100 @@ impl GroupedHashAggregateStream {
                         )?;
                     }
                 }
-
-                allocated += acc.size();
-                allocated -= acc_size_pre;
+                memory_delta.inc(acc.size());
             }
         }
-        allocated += self.row_converter.size();
-        allocated -= row_converter_size_pre;
+        memory_delta.inc(self.state_size());
 
-        Ok(allocated)
+        // Update allocation AFTER it is used, simplifying accounting,
+        // though it results in a temporary overshoot.
+        memory_delta.update(&mut self.reservation)
     }
 
-    /// Create an output RecordBatch with all group keys and accumulator 
states/values
-    fn create_batch_from_map(&mut self) -> Result<RecordBatch> {
+    /// Create an output RecordBatch with the group keys and
+    /// accumulator states/values specified in emit_to
+    fn create_batch_from_map(&mut self, emit_to: EmitTo) -> 
Result<RecordBatch> {
         if self.group_values.num_rows() == 0 {
-            let schema = self.schema.clone();
-            return Ok(RecordBatch::new_empty(schema));
+            return Ok(RecordBatch::new_empty(self.schema()));
         }
 
+        let output = self.build_output(emit_to)?;
+        self.remove_emitted(emit_to)?;
+        let batch = RecordBatch::try_new(self.schema(), output)?;
+        Ok(batch)
+    }
+
+    /// Creates output: `(group 1, group 2, ... agg 1, agg 2, ...)`
+    fn build_output(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
         // First output rows are the groups
-        let groups_rows = self.group_values.iter();
-        let mut output: Vec<ArrayRef> = 
self.row_converter.convert_rows(groups_rows)?;
+        let mut output: Vec<ArrayRef> = match emit_to {
+            EmitTo::All => {
+                let groups_rows = self.group_values.iter();
+                self.row_converter.convert_rows(groups_rows)?
+            }
+            EmitTo::First(n) => {
+                let groups_rows = self.group_values.iter().take(n);
+                self.row_converter.convert_rows(groups_rows)?
+            }
+        };
 
-        // Next output each aggregate value, from the accumulators
+        // Next output each aggregate value
         for acc in self.accumulators.iter_mut() {
             match self.mode {
-                AggregateMode::Partial => output.extend(acc.state()?),
+                AggregateMode::Partial => output.extend(acc.state(emit_to)?),
                 AggregateMode::Final
                 | AggregateMode::FinalPartitioned
                 | AggregateMode::Single
-                | AggregateMode::SinglePartitioned => 
output.push(acc.evaluate()?),
+                | AggregateMode::SinglePartitioned => 
output.push(acc.evaluate(emit_to)?),
             }
         }
 
-        Ok(RecordBatch::try_new(self.schema.clone(), output)?)
+        Ok(output)
+    }
+
+    /// Removes the first `n` groups, adjusting all group_indices
+    /// appropriately
+    fn remove_emitted(&mut self, emit_to: EmitTo) -> Result<()> {
+        let mut memory_delta = MemoryDelta::new();
+        memory_delta.dec(self.state_size());
+
+        match emit_to {
+            EmitTo::All => {
+                // Eventually we may also want to clear the hash table here
+                //self.map.clear();
+            }
+            EmitTo::First(n) => {
+                // Clear out first n group keys by copying them to a new Rows.
+                // TODO file some ticket in arrow-rs to make this more 
efficent?

Review Comment:
   In this case, the group_values also need to support "remove the first N 
values" even if we removed the row format from the partial order state



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -303,6 +322,16 @@ fn create_group_accumulator(
     }
 }
 
+/// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with 
errors
+macro_rules! extract_ok {

Review Comment:
   I tried this - I am not sure it is better -- I will put up a follow on PR 
https://github.com/apache/arrow-datafusion/pull/7025



##########
datafusion/core/src/physical_plan/aggregates/order/full.rs:
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+
+use crate::physical_expr::EmitTo;
+
+/// Tracks grouping state when the data is ordered entirely by its
+/// group keys
+///
+/// When the group values are sorted, as soon as we see group `n+1` we
+/// know we will never see any rows for group `n again and thus they
+/// can be emitted.
+///
+/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
+/// by `id` as soon as a new `id` value is seen all previous values
+/// can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///      ┌─────┐   ┌──────────────────┐
+///      │┌───┐│   │ ┌──────────────┐ │         ┏━━━━━━━━━━━━━━┓
+///      ││ 0 ││   │ │     123      │ │   ┌─────┃      13      ┃
+///      │└───┘│   │ └──────────────┘ │   │     ┗━━━━━━━━━━━━━━┛
+///      │ ... │   │    ...           │   │
+///      │┌───┐│   │ ┌──────────────┐ │   │         current
+///      ││12 ││   │ │     234      │ │   │
+///      │├───┤│   │ ├──────────────┤ │   │
+///      ││12 ││   │ │     234      │ │   │
+///      │├───┤│   │ ├──────────────┤ │   │
+///      ││13 ││   │ │     456      │◀┼───┘
+///      │└───┘│   │ └──────────────┘ │
+///      └─────┘   └──────────────────┘
+///
+///  group indices    group_values        current tracks the most
+/// (in group value                          recent group index
+///      order)
+/// ```
+///
+/// In this diagram, the current group is `13`, and thus groups
+/// `0..12` can be emitted. Note that `13` can not yet be emitted as
+/// there may be more values in the next batch with the same group_id.
+#[derive(Debug)]
+pub(crate) struct GroupOrderingFull {
+    state: State,
+    /// Hash values for groups in 0..current
+    hashes: Vec<u64>,
+}
+
+#[derive(Debug)]
+enum State {
+    /// Seen no input yet
+    Start,
+
+    /// Data is in progress. `current is the current group for which
+    /// values are being generated. Can emit `current` - 1
+    InProgress { current: usize },
+
+    /// Seen end of input: all groups can be emitted
+    Complete,
+}
+
+impl GroupOrderingFull {
+    pub fn new() -> Self {
+        Self {
+            state: State::Start,
+            hashes: vec![],
+        }
+    }
+
+    // How many groups be emitted, or None if no data can be emitted
+    pub fn emit_to(&self) -> Option<EmitTo> {
+        match &self.state {
+            State::Start => None,
+            State::InProgress { current, .. } => {
+                if *current == 0 {
+                    // Can not emit if still on the first row
+                    None
+                } else {
+                    // otherwise emit all rows prior to the current group
+                    Some(EmitTo::First(*current))
+                }
+            }
+            State::Complete { .. } => Some(EmitTo::All),
+        }
+    }
+
+    /// remove the first n groups from the internal state, shifting
+    /// all existing indexes down by `n`. Returns stored hash values
+    pub fn remove_groups(&mut self, n: usize) -> &[u64] {
+        match &mut self.state {
+            State::Start => panic!("invalid state: start"),
+            State::InProgress { current } => {
+                // shift down by n
+                assert!(*current >= n);
+                *current -= n;
+                self.hashes.drain(0..n);
+            }
+            State::Complete { .. } => panic!("invalid state: complete"),
+        };
+        &self.hashes
+    }
+
+    /// Note that the input is complete so any outstanding groups are done as 
well
+    pub fn input_done(&mut self) {
+        self.state = State::Complete;
+    }
+
+    /// Called when new groups are added in a batch. See documentation
+    /// on [`super::GroupOrdering::new_groups`]
+    pub fn new_groups(
+        &mut self,
+        group_indices: &[usize],
+        batch_hashes: &[u64],

Review Comment:
   That is a (very) good point -- noted in 
https://github.com/apache/arrow-datafusion/issues/7023 for follow on work



##########
datafusion/core/src/physical_plan/aggregates/order/partial.rs:
##########
@@ -0,0 +1,266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::physical_expr::EmitTo;
+use arrow::row::{OwnedRow, RowConverter, Rows, SortField};
+use arrow_array::ArrayRef;
+use arrow_schema::Schema;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::proxy::VecAllocExt;
+use datafusion_physical_expr::PhysicalSortExpr;
+
+/// Tracks grouping state when the data is ordered by some subset of
+/// the group keys.
+///
+/// Once the next *sort key* value is seen, never see groups with that
+/// sort key again, so we can emit all groups wtih the previous sort
+/// key and earlier.
+///
+/// For example, given `SUM(amt) GROUP BY id, state` if the input is
+/// sorted by `state, when a new value of `state` is seen, all groups
+/// with prior values of `state` can be emitted.
+///
+/// The state is tracked like this:
+///
+/// ```text
+///                                            ┏━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━┓
+///     ┌─────┐    ┌───────────────────┐ ┌─────┃        9        ┃ ┃ "MD"  ┃
+///     │┌───┐│    │ ┌──────────────┐  │ │     ┗━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━┛
+///     ││ 0 ││    │ │  123, "MA"   │  │ │        current_sort      sort_key
+///     │└───┘│    │ └──────────────┘  │ │
+///     │ ... │    │    ...            │ │      current_sort tracks the most
+///     │┌───┐│    │ ┌──────────────┐  │ │      recent group index that had
+///     ││12 ││    │ │  765, "MA"   │  │ │      the same sort_key as current
+///     │├───┤│    │ ├──────────────┤  │ │
+///     ││12 ││    │ │  923, "MD"   │◀─┼─┘
+///     │├───┤│    │ ├──────────────┤  │        ┏━━━━━━━━━━━━━━┓
+///     ││13 ││    │ │  345, "MD"   │◀─┼────────┃      12      ┃
+///     │└───┘│    │ └──────────────┘  │        ┗━━━━━━━━━━━━━━┛
+///     └─────┘    └───────────────────┘            current
+///  group indices
+/// (in group value  group_values               current tracks the most
+///      order)                                    recent group index
+///```
+#[derive(Debug)]
+pub(crate) struct GroupOrderingPartial {
+    /// State machine
+    state: State,
+
+    /// The indexes of the group by columns that form the sort key.
+    /// For example if grouping by `id, state` and ordered by `state`
+    /// this would be `[1]`.
+    order_indices: Vec<usize>,
+
+    /// Converter for the sort key (used on the group columns
+    /// specified in `order_indexes`)
+    row_converter: RowConverter,
+
+    /// Hash values for groups in 0..completed
+    hashes: Vec<u64>,
+}
+
+#[derive(Debug, Default)]
+enum State {
+    /// The ordering was temporarily taken.  `Self::Taken` is left
+    /// when state must be temporarily taken to satisfy the borrow
+    /// checker. If an error happens before the state can be restored,
+    /// the ordering information is lost and execution can not
+    /// proceed, but there is no undefined behavior.
+    #[default]
+    Taken,
+
+    /// Seen no input yet
+    Start,
+
+    /// Data is in progress.
+    InProgress {
+        /// first group index with the sort_key
+        current_sort: usize,
+        /// The sort key of group_index `current_sort`
+        sort_key: OwnedRow,

Review Comment:
   This is an excellent idea -- I filed it as  
https://github.com/apache/arrow-datafusion/issues/7023 for follow on work
   
   From my perspective this PR will already be faster than the existing 
streaming group by (which uses `ScalaValue` to track the sort keys) so I think 
it is acceptable to merge as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to