alamb commented on code in PR #8038:
URL: https://github.com/apache/arrow-datafusion/pull/8038#discussion_r1387027302


##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -86,7 +86,7 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
                                             } else {
                                                 
AggregateMode::SinglePartitioned
                                             };
-                                        AggregateExec::try_new(
+                                        let combined_agg = 
AggregateExec::try_new(

Review Comment:
   Another way to express the same logic with less indenting is:
   
   ```rust
                                           AggregateExec::try_new(
                                               mode,
                                               
input_agg_exec.group_by().clone(),
                                               
input_agg_exec.aggr_expr().to_vec(),
                                               
input_agg_exec.filter_expr().to_vec(),
                                               
input_agg_exec.order_by_expr().to_vec(),
                                               input_agg_exec.input().clone(),
                                               
input_agg_exec.input_schema().clone(),
                                           )
                                           .map(|combined_agg| {
                                               
combined_agg.with_limit(agg_exec.limit())
                                           })
                                           .ok()
                                           .map(Arc::new)
   ```



##########
datafusion/core/benches/data_utils/mod.rs:
##########
@@ -156,3 +161,83 @@ pub fn create_record_batches(
         })
         .collect::<Vec<_>>()
 }
+
+/// Create time series data with `partition_cnt` partitions and `sample_cnt` 
rows per partition

Review Comment:
   Thank you for adding the comments here



##########
datafusion/core/tests/sql/group_by.rs:
##########
@@ -70,6 +70,45 @@ async fn group_by_date_trunc() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn distinct_group_by_limit() -> Result<()> {

Review Comment:
   Does this test add additional coverage compared to the tests in 
`datafusion/sqllogictest/test_files/aggregate.slt`?



##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -800,6 +807,39 @@ impl AggregateExec {
     pub fn group_by(&self) -> &PhysicalGroupBy {
         &self.group_by
     }
+
+    /// true, if this Aggregate has a group-by with no required or explicit 
ordering,
+    /// no filtering and no aggregate expressions
+    /// This method qualifies the use of the LimitedDistinctAggregation 
rewrite rule
+    /// on an AggregateExec.
+    pub fn is_unordered_unfiltered_group_by_distinct(&self) -> bool {
+        // ensure there is a group by
+        if self.group_by().is_empty() {
+            return false;
+        }
+        // ensure there are no aggregate expressions
+        if !self.aggr_expr().is_empty() {
+            return false;
+        }
+        // ensure there are no filters on aggregate expressions; the above 
check
+        // may preclude this case
+        if self.filter_expr().iter().any(|e| e.is_some()) {
+            return false;
+        }
+        // ensure there are no order by expressions
+        if self.order_by_expr().iter().any(|e| e.is_some()) {
+            return false;
+        }
+        // ensure there is no output ordering; can this rule be relaxed?

Review Comment:
   it is probably subsumed by the check on the required input ordering, because 
the group operator doesn't introduce any new orderings



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -266,6 +266,12 @@ pub(crate) struct GroupedHashAggregateStream {
 
     /// The spill state object
     spill_state: SpillState,
+
+    /// Optional soft limit on the number of `group_values` in a batch
+    /// If the number of `group_values` in a single batch exceeds this value,
+    /// the `GroupedHashAggregateStream` operation immediately switches to
+    /// output mode and emits all groups.
+    group_values_soft_limit: Option<usize>,

Review Comment:
   I can't see any good way to break this down (other than maybe breaking out 
the benchmarks), so this is fine



##########
datafusion/core/src/physical_optimizer/topk_aggregation.rs:
##########
@@ -118,25 +118,14 @@ impl TopKAggregation {
             }
             Ok(Transformed::No(plan))
         };
-        let child = transform_down_mut(child.clone(), &mut closure).ok()?;
+        let child = child.clone().transform_down_mut(&mut closure).ok()?;
         let sort = SortExec::new(sort.expr().to_vec(), child)
             .with_fetch(sort.fetch())
             .with_preserve_partitioning(sort.preserve_partitioning());
         Some(Arc::new(sort))
     }
 }
 
-fn transform_down_mut<F>(

Review Comment:
   👍  for moving into the trait



##########
datafusion/core/tests/sql/group_by.rs:
##########
@@ -70,6 +70,45 @@ async fn group_by_date_trunc() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn distinct_group_by_limit() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = create_groupby_context(&tmp_dir).await?;
+
+    let sql = "SELECT DISTINCT trace_id from traces group by trace_id limit 4";
+    let dataframe = ctx.sql(sql).await?;
+
+    // ensure we see `lim=[4]`
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let mut expected_physical_plan = r#"
+GlobalLimitExec: skip=0, fetch=4
+  AggregateExec: mode=Single, gby=[trace_id@0 as trace_id], aggr=[], lim=[4]
+    AggregateExec: mode=Single, gby=[trace_id@0 as trace_id], aggr=[], lim=[4]
+    "#
+    .trim()
+    .to_string();
+    let actual_phys_plan =
+        format_plan(physical_plan.clone(), &mut expected_physical_plan);
+    assert_eq!(expected_physical_plan, actual_phys_plan);
+
+    let batches = collect(physical_plan, ctx.task_ctx()).await?;
+    let expected = r#"
++----------+
+| trace_id |
++----------+
+| 0        |
+| 1        |
+| 2        |
+| 3        |
++----------+
+"#
+    .trim();
+    let actual = format!("{}", pretty_format_batches(&batches)?);
+    assert_eq!(actual, expected);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn group_by_limit() -> Result<()> {

Review Comment:
   🤔  I think we could port the rest of these tests to group_by.slt 🤔 (not in 
this PR, I just noticed this while reviewing)



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -448,18 +498,11 @@ impl Stream for GroupedHashAggregateStream {
                         }
                         None => {
                             // inner is done, emit all rows and switch to 
producing output
-                            self.input_done = true;
-                            self.group_ordering.input_done();
-                            let timer = elapsed_compute.timer();
-                            if self.spill_state.spills.is_empty() {
-                                let batch = extract_ok!(self.emit(EmitTo::All, 
false));
-                                self.exec_state = 
ExecutionState::ProducingOutput(batch);
-                            } else {
-                                // If spill files exist, stream-merge them.
-                                extract_ok!(self.update_merged_stream());
-                                self.exec_state = ExecutionState::ReadingInput;
+                            if let Poll::Ready(Some(Err(e))) =

Review Comment:
   I was confused by this at first, as it looks like it discards any batch 
produced by `set_input_done_and_produce_output`? Like if 
`set_input_done_and_produce_output` returns `Poll::Ready(Some(batch))` it just 
gets dropped 🤔 
   
   However, then I re-reviewed the code and `set_input_done_and_produce_output` 
never returns `Poll::Ready(Some(batch))`. 
   
   I have a thought about how to simplify this code which I will put up as 
another PR for your consideration
   
   I don't think this would prevent this PR from merging



##########
datafusion/core/src/physical_optimizer/optimizer.rs:
##########
@@ -79,6 +80,8 @@ impl PhysicalOptimizer {
             // repartitioning and local sorting steps to meet distribution and 
ordering requirements.
             // Therefore, it should run before EnforceDistribution and 
EnforceSorting.
             Arc::new(JoinSelection::new()),
+            // The LimitedDistinctAggregation rule should be applied before 
the EnforceDistribution rule

Review Comment:
   I think adding the rationale for this limitation would be helpful. Your PR 
description I think explains it pretty well:
   
   ```suggestion
               // The LimitedDistinctAggregation rule should be applied before 
the EnforceDistribution rule
               // As that rule may inject other operations in between the 
different AggregateExecs.
               // Applying the rule early means only directly-connected 
AggregateExecs must be examined.
               
   ```



##########
datafusion/core/src/physical_optimizer/topk_aggregation.rs:
##########
@@ -118,25 +118,14 @@ impl TopKAggregation {
             }
             Ok(Transformed::No(plan))
         };
-        let child = transform_down_mut(child.clone(), &mut closure).ok()?;
+        let child = child.clone().transform_down_mut(&mut closure).ok()?;
         let sort = SortExec::new(sort.expr().to_vec(), child)
             .with_fetch(sort.fetch())
             .with_preserve_partitioning(sort.preserve_partitioning());
         Some(Arc::new(sort))
     }
 }
 
-fn transform_down_mut<F>(

Review Comment:
   👍  for moving into the trait



##########
datafusion/physical-plan/src/aggregates/mod.rs:
##########
@@ -800,6 +807,39 @@ impl AggregateExec {
     pub fn group_by(&self) -> &PhysicalGroupBy {
         &self.group_by
     }
+
+    /// true, if this Aggregate has a group-by with no required or explicit 
ordering,
+    /// no filtering and no aggregate expressions
+    /// This method qualifies the use of the LimitedDistinctAggregation 
rewrite rule
+    /// on an AggregateExec.
+    pub fn is_unordered_unfiltered_group_by_distinct(&self) -> bool {
+        // ensure there is a group by
+        if self.group_by().is_empty() {
+            return false;
+        }
+        // ensure there are no aggregate expressions
+        if !self.aggr_expr().is_empty() {
+            return false;
+        }
+        // ensure there are no filters on aggregate expressions; the above 
check
+        // may preclude this case

Review Comment:
   I agree I don't think you can have a filter without an aggregate expression



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -448,18 +498,11 @@ impl Stream for GroupedHashAggregateStream {
                         }
                         None => {
                             // inner is done, emit all rows and switch to 
producing output
-                            self.input_done = true;
-                            self.group_ordering.input_done();
-                            let timer = elapsed_compute.timer();
-                            if self.spill_state.spills.is_empty() {
-                                let batch = extract_ok!(self.emit(EmitTo::All, 
false));
-                                self.exec_state = 
ExecutionState::ProducingOutput(batch);
-                            } else {
-                                // If spill files exist, stream-merge them.
-                                extract_ok!(self.update_merged_stream());
-                                self.exec_state = ExecutionState::ReadingInput;
+                            if let Poll::Ready(Some(Err(e))) =

Review Comment:
   I was confused by this at first, as it looks like it discards any batch 
produced by `set_input_done_and_produce_output`? Like if 
`set_input_done_and_produce_output` returns `Poll::Ready(Some(batch))` it just 
gets dropped 🤔 
   
   However, then I re-reviewed the code and `set_input_done_and_produce_output` 
never returns `Poll::Ready(Some(batch))`. 
   
   I have a thought about how to simplify this code which I will put up as 
another PR for your consideration
   
   I don't think this would prevent this PR from merging



##########
datafusion/core/tests/sql/group_by.rs:
##########
@@ -70,6 +70,45 @@ async fn group_by_date_trunc() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn distinct_group_by_limit() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let ctx = create_groupby_context(&tmp_dir).await?;
+
+    let sql = "SELECT DISTINCT trace_id from traces group by trace_id limit 4";
+    let dataframe = ctx.sql(sql).await?;
+
+    // ensure we see `lim=[4]`
+    let physical_plan = dataframe.create_physical_plan().await?;
+    let mut expected_physical_plan = r#"
+GlobalLimitExec: skip=0, fetch=4
+  AggregateExec: mode=Single, gby=[trace_id@0 as trace_id], aggr=[], lim=[4]
+    AggregateExec: mode=Single, gby=[trace_id@0 as trace_id], aggr=[], lim=[4]
+    "#
+    .trim()
+    .to_string();
+    let actual_phys_plan =
+        format_plan(physical_plan.clone(), &mut expected_physical_plan);
+    assert_eq!(expected_physical_plan, actual_phys_plan);
+
+    let batches = collect(physical_plan, ctx.task_ctx()).await?;
+    let expected = r#"
++----------+
+| trace_id |
++----------+
+| 0        |
+| 1        |
+| 2        |
+| 3        |
++----------+
+"#
+    .trim();
+    let actual = format!("{}", pretty_format_batches(&batches)?);
+    assert_eq!(actual, expected);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn group_by_limit() -> Result<()> {

Review Comment:
   🤔  I think we could port the rest of these tests to group_by.slt 🤔 (not in 
this PR, I just noticed this while reviewing)



##########
datafusion/common/src/config.rs:
##########
@@ -427,6 +427,11 @@ config_namespace! {
 config_namespace! {
     /// Options related to query optimization
     pub struct OptimizerOptions {
+        /// When set to true, the optimizer will push a limit operation into
+        /// grouped aggregations which have no aggregate expressions, as a 
soft limit,
+        /// emitting groups once the limit is reached, before all rows in the 
group are read.
+        pub enable_distinct_aggregation_soft_limit: bool, default = true

Review Comment:
   💯  for a disable flag 



##########
datafusion/sqllogictest/test_files/aggregate.slt:
##########
@@ -2504,6 +2504,204 @@ NULL 0 0
 b 0 0
 c 1 1
 
+#
+# Push limit into distinct group-by aggregation tests
+#
+
+# Make results deterministic
+statement ok
+set datafusion.optimizer.repartition_aggregations = false;
+
+#
+query TT
+EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5;
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
+----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
+------TableScan: aggregate_test_100 projection=[c3]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5]
+------------CoalescePartitionsExec
+--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5]
+----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], 
has_header=true
+
+query I
+SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5;
+----
+1
+-40
+29
+-85
+-82
+
+query TT
+EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5 offset 4;
+----
+logical_plan
+Limit: skip=4, fetch=5
+--Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[]]
+----TableScan: aggregate_test_100 projection=[c2, c3]
+physical_plan
+GlobalLimitExec: skip=4, fetch=5
+--AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[], lim=[9]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[], 
lim=[9]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], has_header=true
+
+query II
+SELECT c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5 offset 4;
+----
+5 -82
+4 -111
+3 104
+3 13
+1 38
+
+# The limit should only apply to the aggregations which group by c3
+query TT
+EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 
group by c2, c3 limit 4;
+----
+logical_plan
+Limit: skip=0, fetch=4
+--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
+----Projection: aggregate_test_100.c3
+------Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[]]
+--------Filter: aggregate_test_100.c3 >= Int16(10) AND aggregate_test_100.c3 
<= Int16(20)
+----------TableScan: aggregate_test_100 projection=[c2, c3], 
partial_filters=[aggregate_test_100.c3 >= Int16(10), aggregate_test_100.c3 <= 
Int16(20)]
+physical_plan
+GlobalLimitExec: skip=0, fetch=4
+--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[4]
+----CoalescePartitionsExec
+------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[4]
+--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----------ProjectionExec: expr=[c3@1 as c3]
+------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
+--------------CoalescePartitionsExec
+----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], 
aggr=[]
+------------------CoalesceBatchesExec: target_batch_size=8192
+--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
+----------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, 
c3], has_header=true
+
+query I
+SELECT DISTINCT c3 FROM aggregate_test_100 WHERE c3 between 10 and 20 group by 
c2, c3 limit 4;
+----
+13
+17
+12
+14
+
+# An aggregate expression causes the limit to not be pushed to the aggregation
+query TT
+EXPLAIN SELECT max(c1), c2, c3 FROM aggregate_test_100 group by c2, c3 limit 5;
+----
+logical_plan
+Projection: MAX(aggregate_test_100.c1), aggregate_test_100.c2, 
aggregate_test_100.c3
+--Limit: skip=0, fetch=5
+----Aggregate: groupBy=[[aggregate_test_100.c2, aggregate_test_100.c3]], 
aggr=[[MAX(aggregate_test_100.c1)]]
+------TableScan: aggregate_test_100 projection=[c1, c2, c3]
+physical_plan
+ProjectionExec: expr=[MAX(aggregate_test_100.c1)@2 as 
MAX(aggregate_test_100.c1), c2@0 as c2, c3@1 as c3]
+--GlobalLimitExec: skip=0, fetch=5
+----AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], 
aggr=[MAX(aggregate_test_100.c1)]
+------CoalescePartitionsExec
+--------AggregateExec: mode=Partial, gby=[c2@1 as c2, c3@2 as c3], 
aggr=[MAX(aggregate_test_100.c1)]
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3], has_header=true
+
+# TODO(msirek): Extend checking in LimitedDistinctAggregation equal groupings 
to ignore the order of columns

Review Comment:
   BTW the `equivalence` module (recently worked on from @ozankabak and 
@mustafasrepo ) 
https://github.com/apache/arrow-datafusion/blob/15d8c9bf48a56ae9de34d18becab13fd1942dc4a/datafusion/physical-expr/src/equivalence.rs
 has logic to perform this type of analysis



##########
datafusion/common/src/config.rs:
##########
@@ -427,6 +427,11 @@ config_namespace! {
 config_namespace! {
     /// Options related to query optimization
     pub struct OptimizerOptions {
+        /// When set to true, the optimizer will push a limit operation into
+        /// grouped aggregations which have no aggregate expressions, as a 
soft limit,
+        /// emitting groups once the limit is reached, before all rows in the 
group are read.
+        pub enable_distinct_aggregation_soft_limit: bool, default = true

Review Comment:
   💯  for a disable flag 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to