alamb commented on code in PR #15591:
URL: https://github.com/apache/datafusion/pull/15591#discussion_r2074192086


##########
datafusion/core/tests/fuzz_cases/aggregation_fuzzer/context_generator.rs:
##########
@@ -146,11 +147,14 @@ impl SessionContextGenerator {
                 (provider, false)
             };
 
+        let enable_aggregation_blocked_groups = rng.gen_bool(0.5);

Review Comment:
   I wonder if there is any value to testing the old code path 
(`enable_aggregation_blocked_groups = false`) if our goal is to remove it 
eventually.
   
   I recommend only testing with the flag set to the default value



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -39,6 +43,9 @@ impl EmitTo {
     /// remaining values in `v`.
     ///
     /// This avoids copying if Self::All
+    ///
+    /// NOTICE: only support emit strategies: `Self::All` and `Self::First`

Review Comment:
   ```suggestion
       /// NOTICE: only support emit strategies: `Self::All` and `Self::First`
       /// Will call `panic` if called with `Self::NextBlock`
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -982,6 +1099,9 @@ impl GroupedHashAggregateStream {
             && self.update_memory_reservation().is_err()
         {
             assert_ne!(self.mode, AggregateMode::Partial);
+            // TODO: support spilling when blocked group optimization is on

Review Comment:
   I think spilling with blocks is actually likely to be much better performing 
anyways (as we can spill each block potentially in parallel, for example)



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##########
@@ -212,25 +234,261 @@ impl NullState {
     ///
     /// resets the internal state appropriately
     pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
-        let nulls: BooleanBuffer = self.seen_values.finish();
+        self.seen_values.emit(emit_to)
+    }
+}
 
-        let nulls = match emit_to {
-            EmitTo::All => nulls,
-            EmitTo::First(n) => {
-                // split off the first N values in seen_values
-                //
-                // TODO make this more efficient rather than two
-                // copies and bitwise manipulation
-                let first_n_null: BooleanBuffer = 
nulls.iter().take(n).collect();
-                // reset the existing seen buffer
-                for seen in nulls.iter().skip(n) {
-                    self.seen_values.append(seen);
+/// Adapter for supporting dynamic dispatching of [`FlatNullState`] and 
[`BlockedNullState`].
+/// For performance, the cost of batch-level dynamic dispatching is acceptable.
+#[derive(Debug)]
+pub enum NullStateAdapter {
+    Flat(FlatNullState),
+    Blocked(BlockedNullState),
+}
+
+impl NullStateAdapter {
+    pub fn new(block_size: Option<usize>) -> Self {
+        if let Some(blk_size) = block_size {
+            Self::Blocked(BlockedNullState::new(blk_size))
+        } else {
+            Self::Flat(FlatNullState::new())
+        }
+    }
+
+    pub fn accumulate<T, F>(
+        &mut self,
+        group_indices: &[usize],
+        values: &PrimitiveArray<T>,
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+        value_fn: F,
+    ) where
+        T: ArrowPrimitiveType + Send,
+        F: FnMut(u32, u64, T::Native) + Send,
+    {
+        match self {
+            NullStateAdapter::Flat(null_state) => null_state.accumulate(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+            NullStateAdapter::Blocked(null_state) => null_state.accumulate(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+        }
+    }
+
+    pub fn accumulate_boolean<F>(
+        &mut self,
+        group_indices: &[usize],
+        values: &BooleanArray,
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+        value_fn: F,
+    ) where
+        F: FnMut(u32, u64, bool) + Send,
+    {
+        match self {
+            NullStateAdapter::Flat(null_state) => 
null_state.accumulate_boolean(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+            NullStateAdapter::Blocked(null_state) => 
null_state.accumulate_boolean(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+        }
+    }
+
+    pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
+        match self {
+            NullStateAdapter::Flat(null_state) => null_state.build(emit_to),
+            NullStateAdapter::Blocked(null_state) => null_state.build(emit_to),
+        }
+    }
+
+    pub fn size(&self) -> usize {
+        match self {
+            NullStateAdapter::Flat(null_state) => null_state.size(),
+            NullStateAdapter::Blocked(null_state) => null_state.size(),
+        }
+    }
+
+    /// Clone and build a single [`BooleanBuffer`] from `seen_values`,
+    /// only used for testing.
+    #[cfg(test)]
+    fn build_cloned_seen_values(&self) -> BooleanBuffer {
+        match self {
+            NullStateAdapter::Flat(null_state) => {
+                null_state.seen_values[0].finish_cloned()
+            }
+            NullStateAdapter::Blocked(null_state) => {
+                let mut return_builder = BooleanBufferBuilder::new(0);
+                let num_blocks = null_state.seen_values.len();
+                for blk_idx in 0..num_blocks {
+                    let builder = &null_state.seen_values[blk_idx];
+                    for idx in 0..builder.len() {
+                        return_builder.append(builder.get_bit(idx));
+                    }
+                }
+                return_builder.finish()
+            }
+        }
+    }
+
+    #[cfg(test)]
+    fn build_all_in_once(&mut self) -> NullBuffer {
+        match self {
+            NullStateAdapter::Flat(null_state) => 
null_state.build(EmitTo::All),
+            NullStateAdapter::Blocked(null_state) => {
+                let mut return_builder = BooleanBufferBuilder::new(0);
+                let num_blocks = null_state.seen_values.len();
+                for _ in 0..num_blocks {
+                    let blocked_nulls = null_state.build(EmitTo::NextBlock);
+                    for bit in blocked_nulls.inner().iter() {
+                        return_builder.append(bit);
+                    }
                 }
-                first_n_null
+
+                NullBuffer::new(return_builder.finish())
             }
+        }
+    }
+}
+
+/// [`NullState`] for `flat groups input`
+///
+/// At first, you may need to see something about `block_id` and `block_offset`
+/// from [`GroupsAccumulator::supports_blocked_groups`].
+///
+/// The `flat groups input` are organized like:
+///
+/// ```text
+///     row_0 group_index_0
+///     row_1 group_index_1
+///     row_2 group_index_2
+///     ...
+///     row_n group_index_n     
+/// ```
+///
+/// If `row_x group_index_x` is not filtered(`group_index_x` is seen)
+/// `seen_values[group_index_x]` will be set to `true`.
+///
+/// For `set_bit(block_id, block_offset, value)`, `block_id` is unused,
+/// `block_offset` will be set to `group_index`.
+///
+/// [`GroupsAccumulator::supports_blocked_groups`]: 
datafusion_expr_common::groups_accumulator::GroupsAccumulator::supports_blocked_groups
+///
+pub type FlatNullState = NullState<FlatGroupIndexOperations>;
+
+impl FlatNullState {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+impl Default for FlatNullState {
+    fn default() -> Self {
+        Self {
+            seen_values: Blocks::new(None),
+            _phantom: PhantomData {},
+        }
+    }
+}
+
+/// [`NullState`] for `blocked groups input`

Review Comment:
   I really like how you have `NullState` parameterized now



##########
datafusion/expr-common/src/groups_accumulator.rs:
##########
@@ -250,4 +288,30 @@ pub trait GroupsAccumulator: Send {
     /// This function is called once per batch, so it should be `O(n)` to
     /// compute, not `O(num_groups)`
     fn size(&self) -> usize;
+
+    /// Returns `true` if this accumulator supports blocked groups.
+    fn supports_blocked_groups(&self) -> bool {
+        false
+    }
+
+    /// Alter the block size in the accumulator
+    ///
+    /// If the target block size is `None`, it will use a single big
+    /// block(can think it a `Vec`) to manage the state.
+    ///
+    /// If the target block size` is `Some(blk_size)`, it will try to
+    /// set the block size to `blk_size`, and the try will only success
+    /// when the accumulator has supported blocked mode.
+    ///
+    /// NOTICE: After altering block size, all data in previous will be 
cleared.

Review Comment:
   ```suggestion
       /// NOTICE: After altering block size, all data in existing accumulators 
will be cleared.
   ```



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/accumulate.rs:
##########
@@ -212,25 +234,261 @@ impl NullState {
     ///
     /// resets the internal state appropriately
     pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
-        let nulls: BooleanBuffer = self.seen_values.finish();
+        self.seen_values.emit(emit_to)
+    }
+}
 
-        let nulls = match emit_to {
-            EmitTo::All => nulls,
-            EmitTo::First(n) => {
-                // split off the first N values in seen_values
-                //
-                // TODO make this more efficient rather than two
-                // copies and bitwise manipulation
-                let first_n_null: BooleanBuffer = 
nulls.iter().take(n).collect();
-                // reset the existing seen buffer
-                for seen in nulls.iter().skip(n) {
-                    self.seen_values.append(seen);
+/// Adapter for supporting dynamic dispatching of [`FlatNullState`] and 
[`BlockedNullState`].
+/// For performance, the cost of batch-level dynamic dispatching is acceptable.
+#[derive(Debug)]
+pub enum NullStateAdapter {
+    Flat(FlatNullState),
+    Blocked(BlockedNullState),
+}
+
+impl NullStateAdapter {
+    pub fn new(block_size: Option<usize>) -> Self {
+        if let Some(blk_size) = block_size {
+            Self::Blocked(BlockedNullState::new(blk_size))
+        } else {
+            Self::Flat(FlatNullState::new())
+        }
+    }
+
+    pub fn accumulate<T, F>(
+        &mut self,
+        group_indices: &[usize],
+        values: &PrimitiveArray<T>,
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+        value_fn: F,
+    ) where
+        T: ArrowPrimitiveType + Send,
+        F: FnMut(u32, u64, T::Native) + Send,
+    {
+        match self {
+            NullStateAdapter::Flat(null_state) => null_state.accumulate(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+            NullStateAdapter::Blocked(null_state) => null_state.accumulate(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+        }
+    }
+
+    pub fn accumulate_boolean<F>(
+        &mut self,
+        group_indices: &[usize],
+        values: &BooleanArray,
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+        value_fn: F,
+    ) where
+        F: FnMut(u32, u64, bool) + Send,
+    {
+        match self {
+            NullStateAdapter::Flat(null_state) => 
null_state.accumulate_boolean(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+            NullStateAdapter::Blocked(null_state) => 
null_state.accumulate_boolean(
+                group_indices,
+                values,
+                opt_filter,
+                total_num_groups,
+                value_fn,
+            ),
+        }
+    }
+
+    pub fn build(&mut self, emit_to: EmitTo) -> NullBuffer {
+        match self {
+            NullStateAdapter::Flat(null_state) => null_state.build(emit_to),
+            NullStateAdapter::Blocked(null_state) => null_state.build(emit_to),
+        }
+    }
+
+    pub fn size(&self) -> usize {
+        match self {
+            NullStateAdapter::Flat(null_state) => null_state.size(),
+            NullStateAdapter::Blocked(null_state) => null_state.size(),
+        }
+    }
+
+    /// Clone and build a single [`BooleanBuffer`] from `seen_values`,
+    /// only used for testing.
+    #[cfg(test)]
+    fn build_cloned_seen_values(&self) -> BooleanBuffer {
+        match self {
+            NullStateAdapter::Flat(null_state) => {
+                null_state.seen_values[0].finish_cloned()
+            }
+            NullStateAdapter::Blocked(null_state) => {
+                let mut return_builder = BooleanBufferBuilder::new(0);
+                let num_blocks = null_state.seen_values.len();
+                for blk_idx in 0..num_blocks {
+                    let builder = &null_state.seen_values[blk_idx];
+                    for idx in 0..builder.len() {
+                        return_builder.append(builder.get_bit(idx));
+                    }
+                }
+                return_builder.finish()
+            }
+        }
+    }
+
+    #[cfg(test)]
+    fn build_all_in_once(&mut self) -> NullBuffer {
+        match self {
+            NullStateAdapter::Flat(null_state) => 
null_state.build(EmitTo::All),
+            NullStateAdapter::Blocked(null_state) => {
+                let mut return_builder = BooleanBufferBuilder::new(0);
+                let num_blocks = null_state.seen_values.len();
+                for _ in 0..num_blocks {
+                    let blocked_nulls = null_state.build(EmitTo::NextBlock);
+                    for bit in blocked_nulls.inner().iter() {
+                        return_builder.append(bit);
+                    }
                 }
-                first_n_null
+
+                NullBuffer::new(return_builder.finish())
             }
+        }
+    }
+}
+
+/// [`NullState`] for `flat groups input`
+///
+/// At first, you may need to see something about `block_id` and `block_offset`
+/// from [`GroupsAccumulator::supports_blocked_groups`].
+///
+/// The `flat groups input` are organized like:
+///
+/// ```text
+///     row_0 group_index_0
+///     row_1 group_index_1
+///     row_2 group_index_2
+///     ...
+///     row_n group_index_n     
+/// ```
+///
+/// If `row_x group_index_x` is not filtered(`group_index_x` is seen)
+/// `seen_values[group_index_x]` will be set to `true`.
+///
+/// For `set_bit(block_id, block_offset, value)`, `block_id` is unused,
+/// `block_offset` will be set to `group_index`.
+///
+/// [`GroupsAccumulator::supports_blocked_groups`]: 
datafusion_expr_common::groups_accumulator::GroupsAccumulator::supports_blocked_groups
+///
+pub type FlatNullState = NullState<FlatGroupIndexOperations>;
+
+impl FlatNullState {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+impl Default for FlatNullState {
+    fn default() -> Self {
+        Self {
+            seen_values: Blocks::new(None),
+            _phantom: PhantomData {},
+        }
+    }
+}
+
+/// [`NullState`] for `blocked groups input`
+///
+/// At first, you may need to see something about `block_id` and `block_offset`
+/// from [`GroupsAccumulator::supports_blocked_groups`].
+///
+/// The `flat groups input` are organized like:
+///
+/// ```text
+///     row_0 (block_id_0, block_offset_0)
+///     row_1 (block_id_1, block_offset_1)
+///     row_2 (block_id_1, block_offset_1)

Review Comment:
   Should this have a different block offsets?
   
   ```suggestion
   ///     row_1 (block_id_1, block_offset_0)
   ///     row_2 (block_id_1, block_offset_1)
   ```



##########
datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/blocks.rs:
##########
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Aggregation intermediate results blocks in blocked approach
+
+use std::{
+    collections::VecDeque,
+    fmt::Debug,
+    iter,
+    ops::{Index, IndexMut},
+};
+
+use datafusion_expr_common::groups_accumulator::EmitTo;
+
+/// Structure used to store aggregation intermediate results in `blocked 
approach`
+///
+/// Aggregation intermediate results will be stored as multiple [`Block`]s
+/// (simply you can think a [`Block`] as a `Vec`). And `Blocks` is the 
structure
+/// to represent such multiple [`Block`]s.
+///
+/// More details about `blocked approach` can see in: 
[`GroupsAccumulator::supports_blocked_groups`].

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to