avantgardnerio commented on code in PR #7192:
URL: https://github.com/apache/arrow-datafusion/pull/7192#discussion_r1286535561
##########
datafusion/core/src/physical_plan/aggregates/priority_queue.rs:
##########
@@ -0,0 +1,235 @@
+use uuid::Uuid;
+use std::collections::{BTreeMap, HashMap};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::util::pretty::print_batches;
+use arrow_array::{RecordBatch};
+use arrow_schema::{DataType, SchemaRef, SortOptions};
+use futures::stream::{Stream, StreamExt};
+use hashbrown::HashSet;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr};
+use crate::physical_plan::aggregates::{aggregate_expressions, AggregateExec,
evaluate_group_by, evaluate_many, group_schema, PhysicalGroupBy};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_common::Result;
+use datafusion_physical_expr::expressions::{Max, Min};
+
+pub(crate) struct GroupedPriorityQueueAggregateStream {
+ schema: SchemaRef,
+ input: SendableRecordBatchStream,
+ aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+ group_by: PhysicalGroupBy,
+ group_converter: RowConverter,
+ value_converter: RowConverter, // TODO: use accumulators
+ group_to_val: HashMap<OwnedRow, OwnedRow>, // TODO: BTreeMap->BinaryHeap,
OwnedRow->Rows
+ val_to_group: BTreeMap<OwnedRow, HashSet<OwnedRow>>,
Review Comment:
@alamb I think the `BTreeMap<OwnedRow, HashSet<OwnedRow>>` might be the best
the most efficient overall implementation.
I'm working on a version that:
1. doesn't modify anything if the new value seen is less than the min (or
greater than max) of the working set
2. doesn't modify anything if the value for a group_id meets # 1
3. adds the HashSets if needed
4. upon filling all slots with HashSets, re-uses them to avoid any further
heap allocations
@Dandandan hopefully # 4 in that list satisfies your previous concern?
Taking it to it's logical conclusion one could envision fixed size HashSets up
front (https://github.com/Simsys/fchashmap) but then we'd be talking limit^2
memory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]