alamb commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1715835534


##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -123,7 +151,7 @@ pub trait GroupsAccumulator: Send {
     /// future use. The group_indices on subsequent calls to
     /// `update_batch` or `merge_batch` will be shifted down by
     /// `n`. See [`EmitTo::First`] for more details.
-    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

Review Comment:
   it would help to document what expectations are on the Vec of array refs



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs:
##########
@@ -68,11 +70,21 @@ where
     fn update_batch(

Review Comment:
   In order to realize the benefit of this blocked implementation I think you 
will need to change the state of the accumulators so that instead of a single 
large buffer
   
   ```rust
       /// values per group
       values: BooleanBufferBuilder,
   ```
   
   The state is held in chunks like
   ```rust
       /// blocks of values per group
       values: Vec<BooleanBufferBuilder>
   ```
   
   (or possibly this to support taking them out individually)
   ```rust
       /// blocks of values per group, None when taken
       values: Vec<Option<BooleanBufferBuilder>>
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream {
 
     /// scratch space for the current input [`RecordBatch`] being
     /// processed. Reused across batches here to avoid reallocations
-    current_group_indices: Vec<usize>,
+    current_group_indices: Vec<u64>,

Review Comment:
   Another alternative might be to ensure that the block size is aligned across 
the aggegators and group values -- that way there would be no stitching arrays 
together into batches during emission



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -31,6 +31,13 @@ pub enum EmitTo {
     /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
     /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
     First(usize),
+    /// Emit all groups managed by blocks
+    AllBlocks,
+    /// Emit only the first `n` group blocks,
+    /// similar as `First`, but used in blocked `GroupValues` and 
`GroupAccumulator`.
+    ///
+    /// For example, `n=3`, `block size=4`, finally 12 groups will be returned.
+    FirstBlocks(usize),

Review Comment:
   Rather than having two parallel emission modes for blocked output, I wonder 
if we could have some sort of "take" mode whose semantics did not shift the 
existing values down
   
   For example, what if we introduced a notion of "block" across the group keys 
and and aggregators
   
   ```rust
   pub enum EmitTo {
       /// Same
       All,
       /// Same
       First(usize),
       /// Takes the N'th block of rows from this accumulator
       /// it is an error to take the same batch twice or to emit `All` or 
`First` 
       /// after any TakeBatch(usize)
       TakeBlock(usize)
   }
   ```
   
   And then we would for example, make sure the group values and aggregators 
all saved data using blocks of 100K rows
   
   Then to emit 1M rows, the accumulators would emit like
   ```
   EmitTo::TakeBlock(0)
   EmitTo::TakeBlock(1)
   EmitTo::TakeBlock(2)
   ...
   EmitTo::TakeBlock(9)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to