mustafasrepo commented on code in PR #6904:
URL: https://github.com/apache/arrow-datafusion/pull/6904#discussion_r1259775330
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -16,100 +16,181 @@
// under the License.
//! Hash aggregation through row format
+//!
+//! POC demonstration of GroupByHashApproach
-use std::cmp::min;
-use std::ops::Range;
+use datafusion_physical_expr::{
+ AggregateExpr, GroupsAccumulator, GroupsAccumulatorAdapter,
+};
+use log::debug;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec;
use ahash::RandomState;
-use arrow::row::{RowConverter, SortField};
+use arrow::row::{RowConverter, Rows, SortField};
use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};
-use crate::physical_plan::aggregates::utils::{
- aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
- read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
-};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema,
AggregateMode,
- PhysicalGroupBy, RowAccumulatorItem,
+ PhysicalGroupBy,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
-use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
+use crate::physical_plan::{aggregates, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
-use arrow::compute::cast;
-use arrow::datatypes::DataType;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
-use datafusion_common::cast::as_boolean_array;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
-use datafusion_expr::Accumulator;
-use datafusion_row::accessor::RowAccessor;
-use datafusion_row::layout::RowLayout;
use hashbrown::raw::RawTable;
-use itertools::izip;
+
+#[derive(Debug, Clone)]
+/// This object tracks the aggregation phase (input/output)
+pub(crate) enum ExecutionState {
+ ReadingInput,
+ /// When producing output, the remaining rows to output are stored
+ /// here and are sliced off as needed in batch_size chunks
+ ProducingOutput(RecordBatch),
+ Done,
+}
use super::AggregateExec;
-/// Grouping aggregate with row-format aggregation states inside.
+/// Hash based Grouping Aggregator
+///
+/// # Design Goals
+///
+/// This structure is designed so that updating the aggregates can be
+/// vectorized (done in a tight loop) without allocatons. The
Review Comment:
```suggestion
/// vectorized (done in a tight loop) without allocations. The
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]