Rachelint commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1727490829


##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##########
@@ -458,6 +717,91 @@ fn initialize_builder(
     builder
 }
 
+/// Similar as the [initialize_builder] but designed for the blocked version 
accumulator
+fn ensure_enough_room_for_nulls(
+    builder_blocks: &mut Blocks<BooleanBufferBuilder>,
+    mode: GroupStatesMode,
+    total_num_groups: usize,
+    default_value: bool,
+) {
+    if total_num_groups == 0 {
+        return;
+    }
+
+    match mode {
+        // It flat mode, we just a single builder, and grow it constantly.
+        GroupStatesMode::Flat => {
+            if builder_blocks.num_blocks() == 0 {
+                builder_blocks.push_block(BooleanBufferBuilder::new(0));
+            }
+
+            let builder = builder_blocks.current_mut().unwrap();
+            if builder.len() < total_num_groups {
+                let new_groups = total_num_groups - builder.len();
+                builder.append_n(new_groups, default_value);
+            }
+        }
+        // In blocked mode, we ensure the blks are enough first,
+        // and then ensure slots in blks are enough.
+        GroupStatesMode::Blocked(blk_size) => {
+            let (mut cur_blk_idx, exist_slots) = if 
builder_blocks.num_blocks() > 0 {
+                let cur_blk_idx = builder_blocks.num_blocks() - 1;
+                let exist_slots = (builder_blocks.num_blocks() - 1) * blk_size
+                    + builder_blocks.current().unwrap().len();
+
+                (cur_blk_idx, exist_slots)
+            } else {
+                (0, 0)
+            };
+
+            // No new groups, don't need to expand, just return.
+            if exist_slots >= total_num_groups {
+                return;
+            }
+
+            // Ensure blks are enough
+            let exist_blks = builder_blocks.num_blocks();
+            let new_blks = (total_num_groups + blk_size - 1) / blk_size - 
exist_blks;
+            if new_blks > 0 {
+                for _ in 0..new_blks {
+                    
builder_blocks.push_block(BooleanBufferBuilder::new(blk_size));
+                }
+            }
+
+            // Ensure slots are enough.
+            let mut new_slots = total_num_groups - exist_slots;
+
+            // Expand current blk.
+            let cur_blk_rest_slots = blk_size - 
builder_blocks[cur_blk_idx].len();
+            if cur_blk_rest_slots >= new_slots {
+                builder_blocks[cur_blk_idx].append_n(new_slots, default_value);
+                return;
+            }
+
+            // Expand current blk to full, and expand next blks
+            builder_blocks[cur_blk_idx].append_n(cur_blk_rest_slots, 
default_value);
+            new_slots -= cur_blk_rest_slots;
+            cur_blk_idx += 1;
+
+            // Expand blks
+            let expand_blks = new_slots / blk_size;
+            for _ in 0..expand_blks {
+                builder_blocks[cur_blk_idx].append_n(blk_size, default_value);
+                cur_blk_idx += 1;
+            }
+
+            // Expand the last blk.
+            let last_expand_slots = new_slots % blk_size;
+            if last_expand_slots > 0 {
+                builder_blocks
+                    .current_mut()
+                    .unwrap()
+                    .append_n(last_expand_slots, default_value);
+            }
+        }
+    }
+}
+

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to