alamb commented on code in PR #11943: URL: https://github.com/apache/datafusion/pull/11943#discussion_r1715835534
########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -123,7 +151,7 @@ pub trait GroupsAccumulator: Send { /// future use. The group_indices on subsequent calls to /// `update_batch` or `merge_batch` will be shifted down by /// `n`. See [`EmitTo::First`] for more details. - fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>; + fn evaluate(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>; Review Comment: it would help to document what expectations are on the Vec of array refs ########## datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs: ########## @@ -68,11 +70,21 @@ where fn update_batch( Review Comment: In order to realize the benefit of this blocked implementation I think you will need to change the state of the accumulators so that instead of a single large buffer ```rust /// values per group values: BooleanBufferBuilder, ``` The state is held in chunks like ```rust /// blocks of values per group values: Vec<BooleanBufferBuilder> ``` (or possibly this to support taking them out individually) ```rust /// blocks of values per group, None when taken values: Vec<Option<BooleanBufferBuilder>> ``` ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -353,7 +355,7 @@ pub(crate) struct GroupedHashAggregateStream { /// scratch space for the current input [`RecordBatch`] being /// processed. Reused across batches here to avoid reallocations - current_group_indices: Vec<usize>, + current_group_indices: Vec<u64>, Review Comment: Another alternative might be to ensure that the block size is aligned across the aggegators and group values -- that way there would be no stitching arrays together into batches during emission ########## datafusion/expr-common/src/groups_accumulator.rs: ########## @@ -31,6 +31,13 @@ pub enum EmitTo { /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`. First(usize), + /// Emit all groups managed by blocks + AllBlocks, + /// Emit only the first `n` group blocks, + /// similar as `First`, but used in blocked `GroupValues` and `GroupAccumulator`. + /// + /// For example, `n=3`, `block size=4`, finally 12 groups will be returned. + FirstBlocks(usize), Review Comment: Rather than having two parallel emission modes for blocked output, I wonder if we could have some sort of "take" mode whose semantics did not shift the existing values down For example, what if we introduced a notion of "block" across the group keys and and aggregators ```rust pub enum EmitTo { /// Same All, /// Same First(usize), /// Takes the N'th block of rows from this accumulator /// it is an error to take the same batch twice or to emit `All` or `First` /// after any TakeBatch(usize) TakeBlock(usize) } ``` And then we would for example, make sure the group values and aggregators all saved data using blocks of 100K rows Then to emit 1M rows, the accumulators would emit like ``` EmitTo::TakeBlock(0) EmitTo::TakeBlock(1) EmitTo::TakeBlock(2) ... EmitTo::TakeBlock(9) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org