Dandandan commented on code in PR #7192:
URL: https://github.com/apache/arrow-datafusion/pull/7192#discussion_r1284113078


##########
datafusion/core/src/physical_plan/aggregates/priority_queue.rs:
##########
@@ -0,0 +1,235 @@
+use uuid::Uuid;
+use std::collections::{BTreeMap, HashMap};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::util::pretty::print_batches;
+use arrow_array::{RecordBatch};
+use arrow_schema::{DataType, SchemaRef, SortOptions};
+use futures::stream::{Stream, StreamExt};
+use hashbrown::HashSet;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr};
+use crate::physical_plan::aggregates::{aggregate_expressions, AggregateExec, 
evaluate_group_by, evaluate_many, group_schema, PhysicalGroupBy};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_common::Result;
+use datafusion_physical_expr::expressions::{Max, Min};
+
+pub(crate) struct GroupedPriorityQueueAggregateStream {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    group_by: PhysicalGroupBy,
+    group_converter: RowConverter,
+    value_converter: RowConverter, // TODO: use accumulators
+    group_to_val: HashMap<OwnedRow, OwnedRow>, // TODO: BTreeMap->BinaryHeap, 
OwnedRow->Rows
+    val_to_group: BTreeMap<OwnedRow, HashSet<OwnedRow>>,
+    limit: Option<usize>,
+    id: Uuid,
+    descending: bool,
+}
+
+impl GroupedPriorityQueueAggregateStream {
+    pub fn new(
+        agg: &AggregateExec,
+        context: Arc<TaskContext>,
+        partition: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let agg_schema = Arc::clone(&agg.schema);
+        let agg_group_by = agg.group_by.clone();
+
+        let input = agg.input.execute(partition, Arc::clone(&context))?;
+
+        let aggregate_arguments = aggregate_expressions(
+            &agg.aggr_expr,
+            &agg.mode,
+            agg_group_by.expr.len(),
+        )?;
+
+        let (value_type, descending) = Self::get_minmax_desc(agg)
+            .ok_or_else(|| DataFusionError::Execution("Min/max 
required".to_string()))?;
+        let sort = SortOptions { descending, nulls_first: false };
+        let value_fields = vec![SortField::new_with_options(value_type, sort)];
+        let value_converter = RowConverter::new(value_fields.clone())?;
+
+        // TODO: filters
+
+        let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
+        // Schema { fields: [Field { name: "trace_id", data_type: UInt64
+        println!("group_schema={group_schema:?}");
+        let group_fields = group_schema
+            .fields()
+            .iter()
+            .map(|f| SortField::new(f.data_type().clone()))
+            .collect::<Vec<_>>();
+        let group_converter = RowConverter::new(group_fields.clone())?;
+
+        Ok(GroupedPriorityQueueAggregateStream {
+            schema: agg_schema,
+            input,
+            aggregate_arguments,
+            group_by: agg_group_by,
+            group_to_val: Default::default(),
+            val_to_group: Default::default(),
+            limit,
+            group_converter,
+            value_converter,
+            id: Uuid::new_v4(),
+            descending,
+        })
+    }
+
+    pub fn get_minmax_desc(agg: &AggregateExec) -> Option<(DataType, bool)> {
+        let agg_expr = match agg.aggr_expr.as_slice() {
+            [] => return None,

Review Comment:
   ```suggestion
   ```



##########
datafusion/core/src/physical_plan/aggregates/priority_queue.rs:
##########
@@ -0,0 +1,235 @@
+use uuid::Uuid;
+use std::collections::{BTreeMap, HashMap};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use arrow::row::{OwnedRow, RowConverter, SortField};
+use arrow::util::pretty::print_batches;
+use arrow_array::{RecordBatch};
+use arrow_schema::{DataType, SchemaRef, SortOptions};
+use futures::stream::{Stream, StreamExt};
+use hashbrown::HashSet;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{PhysicalExpr};
+use crate::physical_plan::aggregates::{aggregate_expressions, AggregateExec, 
evaluate_group_by, evaluate_many, group_schema, PhysicalGroupBy};
+use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_common::Result;
+use datafusion_physical_expr::expressions::{Max, Min};
+
+pub(crate) struct GroupedPriorityQueueAggregateStream {
+    schema: SchemaRef,
+    input: SendableRecordBatchStream,
+    aggregate_arguments: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    group_by: PhysicalGroupBy,
+    group_converter: RowConverter,
+    value_converter: RowConverter, // TODO: use accumulators
+    group_to_val: HashMap<OwnedRow, OwnedRow>, // TODO: BTreeMap->BinaryHeap, 
OwnedRow->Rows
+    val_to_group: BTreeMap<OwnedRow, HashSet<OwnedRow>>,
+    limit: Option<usize>,
+    id: Uuid,
+    descending: bool,
+}
+
+impl GroupedPriorityQueueAggregateStream {
+    pub fn new(
+        agg: &AggregateExec,
+        context: Arc<TaskContext>,
+        partition: usize,
+        limit: Option<usize>,
+    ) -> Result<Self> {
+        let agg_schema = Arc::clone(&agg.schema);
+        let agg_group_by = agg.group_by.clone();
+
+        let input = agg.input.execute(partition, Arc::clone(&context))?;
+
+        let aggregate_arguments = aggregate_expressions(
+            &agg.aggr_expr,
+            &agg.mode,
+            agg_group_by.expr.len(),
+        )?;
+
+        let (value_type, descending) = Self::get_minmax_desc(agg)
+            .ok_or_else(|| DataFusionError::Execution("Min/max 
required".to_string()))?;
+        let sort = SortOptions { descending, nulls_first: false };
+        let value_fields = vec![SortField::new_with_options(value_type, sort)];
+        let value_converter = RowConverter::new(value_fields.clone())?;
+
+        // TODO: filters
+
+        let group_schema = group_schema(&agg_schema, agg_group_by.expr.len());
+        // Schema { fields: [Field { name: "trace_id", data_type: UInt64
+        println!("group_schema={group_schema:?}");
+        let group_fields = group_schema
+            .fields()
+            .iter()
+            .map(|f| SortField::new(f.data_type().clone()))
+            .collect::<Vec<_>>();
+        let group_converter = RowConverter::new(group_fields.clone())?;
+
+        Ok(GroupedPriorityQueueAggregateStream {
+            schema: agg_schema,
+            input,
+            aggregate_arguments,
+            group_by: agg_group_by,
+            group_to_val: Default::default(),
+            val_to_group: Default::default(),
+            limit,
+            group_converter,
+            value_converter,
+            id: Uuid::new_v4(),
+            descending,
+        })
+    }
+
+    pub fn get_minmax_desc(agg: &AggregateExec) -> Option<(DataType, bool)> {
+        let agg_expr = match agg.aggr_expr.as_slice() {
+            [] => return None,
+            [expr] => expr,
+            _ => return None,
+        };
+        let binding = agg_expr.expressions();
+        let expr = match binding.as_slice() {
+            [] => return None,

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to