alamb commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1723125203


##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##########
@@ -458,6 +717,91 @@ fn initialize_builder(
     builder
 }
 
+/// Similar as the [initialize_builder] but designed for the blocked version 
accumulator
+fn ensure_enough_room_for_nulls(
+    builder_blocks: &mut Blocks<BooleanBufferBuilder>,
+    mode: GroupStatesMode,
+    total_num_groups: usize,
+    default_value: bool,
+) {
+    if total_num_groups == 0 {
+        return;
+    }
+
+    match mode {
+        // It flat mode, we just a single builder, and grow it constantly.
+        GroupStatesMode::Flat => {
+            if builder_blocks.num_blocks() == 0 {
+                builder_blocks.push_block(BooleanBufferBuilder::new(0));
+            }
+
+            let builder = builder_blocks.current_mut().unwrap();
+            if builder.len() < total_num_groups {
+                let new_groups = total_num_groups - builder.len();
+                builder.append_n(new_groups, default_value);
+            }
+        }
+        // In blocked mode, we ensure the blks are enough first,
+        // and then ensure slots in blks are enough.
+        GroupStatesMode::Blocked(blk_size) => {
+            let (mut cur_blk_idx, exist_slots) = if 
builder_blocks.num_blocks() > 0 {
+                let cur_blk_idx = builder_blocks.num_blocks() - 1;
+                let exist_slots = (builder_blocks.num_blocks() - 1) * blk_size
+                    + builder_blocks.current().unwrap().len();
+
+                (cur_blk_idx, exist_slots)
+            } else {
+                (0, 0)
+            };
+
+            // No new groups, don't need to expand, just return.
+            if exist_slots >= total_num_groups {
+                return;
+            }
+
+            // Ensure blks are enough
+            let exist_blks = builder_blocks.num_blocks();
+            let new_blks = (total_num_groups + blk_size - 1) / blk_size - 
exist_blks;
+            if new_blks > 0 {
+                for _ in 0..new_blks {
+                    
builder_blocks.push_block(BooleanBufferBuilder::new(blk_size));
+                }
+            }
+
+            // Ensure slots are enough.
+            let mut new_slots = total_num_groups - exist_slots;
+
+            // Expand current blk.
+            let cur_blk_rest_slots = blk_size - 
builder_blocks[cur_blk_idx].len();
+            if cur_blk_rest_slots >= new_slots {
+                builder_blocks[cur_blk_idx].append_n(new_slots, default_value);
+                return;
+            }
+
+            // Expand current blk to full, and expand next blks
+            builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, 
default_value);
+            new_slots -= cur_blk_rest_slots;
+            cur_blk_idx += 1;
+
+            // Expand blks
+            let expand_blks = new_slots / blk_size;
+            for _ in 0..expand_blks {
+                builder_blocks[cur_blk_idx].append_n(blk_size, default_value);
+                cur_blk_idx += 1;
+            }
+
+            // Expand the last blk.
+            let last_expand_slots = new_slots % blk_size;
+            if last_expand_slots > 0 {
+                builder_blocks
+                    .current_mut()
+                    .unwrap()
+                    .append_n(last_expand_slots, default_value);
+            }
+        }
+    }
+}
+

Review Comment:
   given the amount of code here, I this should have unit tests that cover the 
basic operation



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -52,8 +71,248 @@ impl EmitTo {
                 std::mem::swap(v, &mut t);
                 t
             }
+            Self::NextBlock(_) => unreachable!(
+                "can not support blocked emission in take_needed, you should 
use take_needed_from_blocks"
+            ),
+        }
+    }
+
+    /// Removes the number of rows from `blocks` required to emit,
+    /// returning a `Vec` with elements taken.
+    ///
+    /// The detailed behavior in different emissions:
+    ///   - For Emit::CurrentBlock, the first block will be taken and return.
+    ///   - For Emit::All and Emit::First, it will be only supported in 
`GroupStatesMode::Flat`,
+    ///     similar as `take_needed`.
+    pub fn take_needed_from_blocks<T>(
+        &self,
+        blocks: &mut VecBlocks<T>,
+        mode: GroupStatesMode,
+    ) -> Vec<T> {
+        match self {
+            Self::All => {
+                debug_assert!(matches!(mode, GroupStatesMode::Flat));
+                blocks.pop_first_block().unwrap()
+            }
+            Self::First(n) => {
+                debug_assert!(matches!(mode, GroupStatesMode::Flat));
+
+                let block = blocks.current_mut().unwrap();
+                let split_at = min(block.len(), *n);
+
+                // get end n+1,.. values into t
+                let mut t = block.split_off(split_at);
+                // leave n+1,.. in v
+                std::mem::swap(block, &mut t);
+                t
+            }
+            Self::NextBlock(_) => {
+                debug_assert!(matches!(mode, GroupStatesMode::Blocked(_)));
+                blocks.pop_first_block().unwrap()
+            }
+        }
+    }
+}
+
+/// Mode for `accumulators` and `group values`

Review Comment:
   👍 



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -52,8 +71,248 @@ impl EmitTo {
                 std::mem::swap(v, &mut t);
                 t
             }
+            Self::NextBlock(_) => unreachable!(
+                "can not support blocked emission in take_needed, you should 
use take_needed_from_blocks"
+            ),
+        }
+    }
+
+    /// Removes the number of rows from `blocks` required to emit,
+    /// returning a `Vec` with elements taken.
+    ///
+    /// The detailed behavior in different emissions:
+    ///   - For Emit::CurrentBlock, the first block will be taken and return.
+    ///   - For Emit::All and Emit::First, it will be only supported in 
`GroupStatesMode::Flat`,
+    ///     similar as `take_needed`.
+    pub fn take_needed_from_blocks<T>(
+        &self,
+        blocks: &mut VecBlocks<T>,
+        mode: GroupStatesMode,
+    ) -> Vec<T> {
+        match self {
+            Self::All => {
+                debug_assert!(matches!(mode, GroupStatesMode::Flat));
+                blocks.pop_first_block().unwrap()
+            }
+            Self::First(n) => {
+                debug_assert!(matches!(mode, GroupStatesMode::Flat));
+
+                let block = blocks.current_mut().unwrap();
+                let split_at = min(block.len(), *n);
+
+                // get end n+1,.. values into t
+                let mut t = block.split_off(split_at);
+                // leave n+1,.. in v
+                std::mem::swap(block, &mut t);
+                t
+            }
+            Self::NextBlock(_) => {
+                debug_assert!(matches!(mode, GroupStatesMode::Blocked(_)));
+                blocks.pop_first_block().unwrap()
+            }
+        }
+    }
+}
+
+/// Mode for `accumulators` and `group values`
+///
+/// Their meanings:
+///   - Flat, the values in them will be managed with a single `Vec`.
+///     It will grow constantly when more and more values are inserted,
+///     that leads to a considerable amount of copying, and finally a bad 
performance.
+///
+///   - Blocked(block_size), the values in them will be managed with multiple 
`Vec`s.
+///     When the block is large enough(reach block_size), a new block will be 
allocated
+///     and used for inserting.
+///     Obviously, this strategy can avoid copying and get a good performance.
+#[derive(Debug, Clone, Copy)]
+pub enum GroupStatesMode {
+    Flat,
+    Blocked(usize),
+}
+
+/// Blocked style group index used in blocked mode group values and 
accumulators

Review Comment:
   a minor comment here is it would be great to try and keep the logic 
(`BlockedGroupIndex`, `Blocks`, etc) in another crate (as datafusion-expr 
mostly currently contains logical `Expr`s and interfaces)
   
   Perhaps we could move them to 
   datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/
   
   (obviously we can't move some things like `GroupStatesMode` as that is 
needed for `GroupsAccumulator::switch_to_mode`)



##########
datafusion/physical-plan/src/aggregates/group_values/bytes.rs:
##########
@@ -115,6 +116,11 @@ impl<O: OffsetSizeTrait> GroupValues for 
GroupValuesByes<O> {
 
                 emit_group_values
             }
+            EmitTo::NextBlock(_) => {

Review Comment:
   I think supporting chunked emission in groups would also likely help click 
bench performance, but is a natural follow on project. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to