This is an automated email from the ASF dual-hosted git repository.

avantgardner pushed a commit to branch bg_aggregate_pushdown
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit f52e42abb3cd4d283f1a50359e7370393306910e
Author: Brent Gardner <[email protected]>
AuthorDate: Tue Aug 1 11:40:00 2023 -0600

    Add TODO
---
 datafusion/core/tests/sql/select.rs | 52 ++++++++++++++++++++++++++++++++-----
 1 file changed, 45 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/tests/sql/select.rs 
b/datafusion/core/tests/sql/select.rs
index a7366595c6..46f2be4320 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow::util::pretty::{pretty_format_batches};
 use super::*;
 use datafusion_common::ScalarValue;
 use tempfile::TempDir;
@@ -540,13 +541,7 @@ async fn parallel_query_with_filter() -> Result<()> {
     let dataframe = ctx
         .sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")
         .await?;
-
-    let plan = dataframe.create_physical_plan().await?;
-
-    let formatted = displayable(plan.as_ref()).indent(true).to_string();
-    assert_eq!("", formatted);
-
-    let results = collect(plan, ctx.task_ctx()).await?;
+    let results = dataframe.collect().await.unwrap();
     let expected = vec![
         "+----+----+",
         "| c1 | c2 |",
@@ -578,6 +573,49 @@ async fn parallel_query_with_filter() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn parallel_query_with_limit() -> Result<()> {
+    let tmp_dir = TempDir::new()?;
+    let partition_count = 4;
+    let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?;
+
+    let dataframe = ctx
+        .sql("SELECT c3, max(c2) as max FROM test group by c3 order by max 
desc limit 2")
+        .await?;
+
+    let plan = dataframe.create_physical_plan().await?;
+
+    let actual_physical_plan = 
displayable(plan.as_ref()).indent(true).to_string();
+    let expected_physical_plan = r#"
+GlobalLimitExec: skip=0, fetch=2
+  SortPreservingMergeExec: [max@1 DESC], fetch=2
+    SortExec: fetch=2, expr=[max@1 DESC]
+      ProjectionExec: expr=[c3@0 as c3, MAX(test.c2)@1 as max]
+        AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], 
aggr=[MAX(test.c2)]
+          CoalesceBatchesExec: target_batch_size=8192
+            RepartitionExec: partitioning=Hash([c3@0], 8), input_partitions=8
+              -- TODO: need SortExec: fetch=2, expr=[max@1 DESC] here?
+              AggregateExec: mode=Partial, gby=[c3@1 as c3], 
aggr=[MAX(test.c2)]
+                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=4
+                  CsvExec: file_groups={4 groups: 
[[private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-3..csv],
 
[private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-2..csv],
 
[private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-1..csv],
 
[private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-0..csv]]},
 projection=[c2, c3], has_header=true
+    "#.trim();
+    assert_eq!(expected_physical_plan, actual_physical_plan);
+
+    let batches = collect(plan, ctx.task_ctx()).await?;
+    let actual_rows = format!("{}", 
pretty_format_batches(batches.as_slice())?);
+    let expected = r#"
++-------+-----+
+| c3    | max |
++-------+-----+
+| true  | 10  |
+| false | 9   |
++-------+-----+
+"#.trim();
+    assert_eq!(expected, actual_rows);
+
+    Ok(())
+}
+
 #[tokio::test]
 async fn boolean_literal() -> Result<()> {
     let results =

Reply via email to