This is an automated email from the ASF dual-hosted git repository. avantgardner pushed a commit to branch bg_aggregate_pushdown in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit f52e42abb3cd4d283f1a50359e7370393306910e Author: Brent Gardner <[email protected]> AuthorDate: Tue Aug 1 11:40:00 2023 -0600 Add TODO --- datafusion/core/tests/sql/select.rs | 52 ++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index a7366595c6..46f2be4320 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use arrow::util::pretty::{pretty_format_batches}; use super::*; use datafusion_common::ScalarValue; use tempfile::TempDir; @@ -540,13 +541,7 @@ async fn parallel_query_with_filter() -> Result<()> { let dataframe = ctx .sql("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") .await?; - - let plan = dataframe.create_physical_plan().await?; - - let formatted = displayable(plan.as_ref()).indent(true).to_string(); - assert_eq!("", formatted); - - let results = collect(plan, ctx.task_ctx()).await?; + let results = dataframe.collect().await.unwrap(); let expected = vec![ "+----+----+", "| c1 | c2 |", @@ -578,6 +573,49 @@ async fn parallel_query_with_filter() -> Result<()> { Ok(()) } +#[tokio::test] +async fn parallel_query_with_limit() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; + + let dataframe = ctx + .sql("SELECT c3, max(c2) as max FROM test group by c3 order by max desc limit 2") + .await?; + + let plan = dataframe.create_physical_plan().await?; + + let actual_physical_plan = displayable(plan.as_ref()).indent(true).to_string(); + let expected_physical_plan = r#" +GlobalLimitExec: skip=0, fetch=2 + SortPreservingMergeExec: [max@1 DESC], fetch=2 + SortExec: fetch=2, expr=[max@1 DESC] + ProjectionExec: expr=[c3@0 as c3, MAX(test.c2)@1 as max] + AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], aggr=[MAX(test.c2)] + CoalesceBatchesExec: target_batch_size=8192 + RepartitionExec: partitioning=Hash([c3@0], 8), input_partitions=8 + -- TODO: need SortExec: fetch=2, expr=[max@1 DESC] here? + AggregateExec: mode=Partial, gby=[c3@1 as c3], aggr=[MAX(test.c2)] + RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4 + CsvExec: file_groups={4 groups: [[private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-3..csv], [private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-2..csv], [private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-1..csv], [private/var/folders/fn/3l7phz3n14z5z5bg62y80cbc0000gp/T/.tmpXUitb5/partition-0..csv]]}, projection=[c2, c3], has_header=true + "#.trim(); + assert_eq!(expected_physical_plan, actual_physical_plan); + + let batches = collect(plan, ctx.task_ctx()).await?; + let actual_rows = format!("{}", pretty_format_batches(batches.as_slice())?); + let expected = r#" ++-------+-----+ +| c3 | max | ++-------+-----+ +| true | 10 | +| false | 9 | ++-------+-----+ +"#.trim(); + assert_eq!(expected, actual_rows); + + Ok(()) +} + #[tokio::test] async fn boolean_literal() -> Result<()> { let results =
