jayzhan211 commented on code in PR #11943:
URL: https://github.com/apache/datafusion/pull/11943#discussion_r1724345877


##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -52,8 +71,248 @@ impl EmitTo {
                 std::mem::swap(v, &mut t);
                 t
             }
+            Self::NextBlock(_) => unreachable!(
+                "can not support blocked emission in take_needed, you should 
use take_needed_from_blocks"
+            ),
+        }
+    }
+
+    /// Removes the number of rows from `blocks` required to emit,
+    /// returning a `Vec` with elements taken.
+    ///
+    /// The detailed behavior in different emissions:
+    ///   - For Emit::CurrentBlock, the first block will be taken and return.
+    ///   - For Emit::All and Emit::First, it will be only supported in 
`GroupStatesMode::Flat`,
+    ///     similar as `take_needed`.
+    pub fn take_needed_from_blocks<T>(
+        &self,
+        blocks: &mut VecBlocks<T>,
+        mode: GroupStatesMode,
+    ) -> Vec<T> {
+        match self {
+            Self::All => {
+                debug_assert!(matches!(mode, GroupStatesMode::Flat));
+                blocks.pop_first_block().unwrap()
+            }
+            Self::First(n) => {
+                debug_assert!(matches!(mode, GroupStatesMode::Flat));
+
+                let block = blocks.current_mut().unwrap();
+                let split_at = min(block.len(), *n);
+
+                // get end n+1,.. values into t
+                let mut t = block.split_off(split_at);
+                // leave n+1,.. in v
+                std::mem::swap(block, &mut t);
+                t
+            }
+            Self::NextBlock(_) => {
+                debug_assert!(matches!(mode, GroupStatesMode::Blocked(_)));
+                blocks.pop_first_block().unwrap()
+            }
+        }
+    }
+}
+
+/// Mode for `accumulators` and `group values`
+///
+/// Their meanings:
+///   - Flat, the values in them will be managed with a single `Vec`.
+///     It will grow constantly when more and more values are inserted,
+///     that leads to a considerable amount of copying, and finally a bad 
performance.
+///
+///   - Blocked(block_size), the values in them will be managed with multiple 
`Vec`s.
+///     When the block is large enough(reach block_size), a new block will be 
allocated
+///     and used for inserting.
+///     Obviously, this strategy can avoid copying and get a good performance.
+#[derive(Debug, Clone, Copy)]
+pub enum GroupStatesMode {
+    Flat,
+    Blocked(usize),
+}
+
+/// Blocked style group index used in blocked mode group values and 
accumulators
+///
+/// Parts in index:
+///   - High 32 bits represent `block_id`
+///   - Low 32 bits represent `block_offset`
+#[derive(Debug, Clone, Copy)]
+pub struct BlockedGroupIndex {
+    pub block_id: usize,
+    pub block_offset: usize,
+}
+
+impl BlockedGroupIndex {
+    pub fn new(group_index: usize) -> Self {
+        let block_id =
+            ((group_index as u64 >> 32) & BLOCKED_INDEX_LOW_32_BITS_MASK) as 
usize;
+        let block_offset =
+            ((group_index as u64) & BLOCKED_INDEX_LOW_32_BITS_MASK) as usize;
+
+        Self {
+            block_id,
+            block_offset,
         }
     }
+
+    pub fn new_from_parts(block_id: usize, block_offset: usize) -> Self {
+        Self {
+            block_id,
+            block_offset,
+        }
+    }
+
+    pub fn as_packed_index(&self) -> usize {
+        ((((self.block_id as u64) << 32) & BLOCKED_INDEX_HIGH_32_BITS_MASK)
+            | (self.block_offset as u64 & BLOCKED_INDEX_LOW_32_BITS_MASK))
+            as usize
+    }
+}
+
+/// The basic data structure for blocked aggregation intermediate results
+pub struct Blocks<T> {
+    /// The current block, it should be pushed into `previous`
+    /// when next block is pushed
+    current: Option<T>,
+
+    /// Previous blocks pushed before `current`
+    previous: VecDeque<T>,

Review Comment:
   What is the reason there is `current` and `previous` instead of one 
`VecDeque<T>`? Is it for performance concern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to