yjshen commented on code in PR #6034:
URL: https://github.com/apache/arrow-datafusion/pull/6034#discussion_r1174509849


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, 
PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(

Review Comment:
   This could be removed, and there's an identical function 
crate::test::csv_exec_sorted



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -331,10 +408,32 @@ impl AggregateExec {
                     batch_size,
                     context,
                     partition,
+                    state,
                 )?,
             ))
         }
     }
+
+    fn calc_aggregate_state(&self) -> Option<AggregationOrdering> {

Review Comment:
   We have `AggregationState` in row_hash. The word `state` might be confusing 
in aggregate contexts.



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -663,12 +912,32 @@ impl std::fmt::Debug for AggregationState {
 }
 
 impl GroupedHashAggregateStream {
+    /// Prune the groups from the `self.aggr_state.group_states` which are in
+    /// `GroupStatus::Emitted`(this status means that result of this group 
emitted/outputted already, and
+    /// we are sure that these groups cannot receive new rows.) status.
+    fn prune(&mut self) {

Review Comment:
   This is smart!



##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -591,38 +785,93 @@ impl GroupedHashAggregateStream {
                 }
                 let batch_indices = batch_indices.finish();
 
-                let row_values = get_at_indices(&row_aggr_input_values, 
&batch_indices)?;
-                let normal_values =
-                    get_at_indices(&normal_aggr_input_values, &batch_indices)?;
                 let row_filter_values =
                     get_optional_filters(&row_filter_values, &batch_indices);
                 let normal_filter_values =
                     get_optional_filters(&normal_filter_values, 
&batch_indices);
-                self.update_accumulators_using_batch(
-                    &groups_with_rows,
-                    &offsets,
-                    &row_values,
-                    &normal_values,
-                    &row_filter_values,
-                    &normal_filter_values,
-                    &mut allocated,
-                )?;
+                if self
+                    .state
+                    .as_ref()
+                    .map_or(false, |s| s.mode == GroupByOrderMode::Ordered)
+                {
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_aggr_input_values,
+                        &normal_aggr_input_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                } else {
+                    let row_values =
+                        get_at_indices(&row_aggr_input_values, 
&batch_indices)?;
+                    let normal_values =
+                        get_at_indices(&normal_aggr_input_values, 
&batch_indices)?;
+                    self.update_accumulators_using_batch(
+                        &groups_with_rows,
+                        &offsets,
+                        &row_values,
+                        &normal_values,
+                        &row_filter_values,
+                        &normal_filter_values,
+                        &mut allocated,
+                    )?;
+                };
             }
         }
         allocated += self
             .row_converter
             .size()
             .saturating_sub(row_converter_size_pre);
+
+        if self.state.is_some() {
+            let mut new_result = false;
+            let last_ordered_columns = self
+                .aggr_state
+                .group_states
+                .last()
+                .map(|item| item.ordered_columns.clone());
+
+            if let Some(last_ordered_columns) = last_ordered_columns {
+                for cur_group in &mut self.aggr_state.group_states {
+                    if cur_group.ordered_columns != last_ordered_columns {
+                        // We will no longer receive value. Set status to 
GroupStatus::CanEmit
+                        // meaning we can generate result for this group.
+                        cur_group.emit_status = GroupStatus::CanEmit;
+                        new_result = true;
+                    }
+                }
+            }
+            if new_result {
+                self.exec_state = ExecutionState::ProducingOutput;
+            }
+        }
+
         Ok(allocated)
     }
 }
 
+#[derive(Debug, PartialEq)]
+enum GroupStatus {
+    // `CannotEmit` means data for current group is not complete. New data may 
arrive.
+    CannotEmit,

Review Comment:
   `GroupProgress`, `Incomplete`, `ReadyToEmit` maybe.



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -822,13 +930,127 @@ mod tests {
     use std::task::{Context, Poll};
 
     use super::StreamType;
+    use crate::physical_plan::aggregates::GroupByOrderMode::{Ordered, 
PartiallyOrdered};
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::{
         ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
         Statistics,
     };
     use crate::prelude::SessionContext;
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+
+        Ok(schema)
+    }
+
+    // Created a sorted parquet exec
+    fn csv_exec_sorted(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        infinite_source: bool,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+
+        Arc::new(CsvExec::new(
+            FileScanConfig {
+                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+                file_schema: schema.clone(),
+                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+                output_ordering: Some(sort_exprs),
+                infinite_source,
+            },
+            false,
+            0,
+            FileCompressionType::UNCOMPRESSED,
+        ))
+    }
+
+    /// make PhysicalSortExpr with default options
+    fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        sort_expr_options(name, schema, SortOptions::default())
+    }
+
+    /// PhysicalSortExpr with specified options
+    fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_working_mode() -> Result<()> {
+        let test_schema = create_test_schema()?;
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST
+        // Column d, e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+        ];
+        let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+        // test cases consists of vector of tuples. Where each tuple 
represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the 
vector represents GROUP BY columns
+        // For instance `vec!["a", "b"]` corresponds to GROUP BY a, b
+        // Second field in the tuple is Option<GroupByOrderMode>, which 
corresponds to expected algorithm mode.
+        // None represents that existing ordering is not sufficient to run 
executor with any one of the algorithms
+        // (We need to add SortExec to be able to run it).
+        // Some(GroupByOrderMode) represents, we can run algorithm with 
existing ordering; and algorithm should work in
+        // GroupByOrderMode.
+        let test_cases = vec![

Review Comment:
   👍



##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -72,6 +74,15 @@ pub enum AggregateMode {
     Single,
 }
 
+/// Group By expression modes
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum GroupByOrderMode {
+    /// Some of the expressions in the GROUP BY clause have an ordering.
+    PartiallyOrdered,

Review Comment:
   `PartiallyOrdered` is left over for follow-up PRs, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to