This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9aacdee138 [MINOR]:Do not introduce unnecessary repartition when row
count is 1. (#7832)
9aacdee138 is described below
commit 9aacdee13881f25335d8ea323311949b9799e6d3
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 17 11:18:44 2023 +0300
[MINOR]:Do not introduce unnecessary repartition when row count is 1.
(#7832)
* Initial commit
* Fix failing tests
* More idiomatic expressions
* Update tests, use batch size during partition benefit check
* Fix failing tests
* is_exact when row count is 1
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/core/src/datasource/listing/table.rs | 13 +-
.../src/physical_optimizer/enforce_distribution.rs | 34 +++--
datafusion/core/tests/sql/order.rs | 8 +-
datafusion/physical-plan/src/aggregates/mod.rs | 21 ++-
datafusion/sqllogictest/test_files/aggregate.slt | 20 +--
datafusion/sqllogictest/test_files/copy.slt | 2 +-
datafusion/sqllogictest/test_files/groupby.slt | 53 +++++---
datafusion/sqllogictest/test_files/join.slt | 10 +-
datafusion/sqllogictest/test_files/joins.slt | 148 +++++++++++----------
datafusion/sqllogictest/test_files/options.slt | 6 +-
datafusion/sqllogictest/test_files/select.slt | 3 +-
datafusion/sqllogictest/test_files/subquery.slt | 32 +++--
.../sqllogictest/test_files/tpch/q15.slt.part | 27 ++--
datafusion/sqllogictest/test_files/union.slt | 107 ++++++++-------
14 files changed, 270 insertions(+), 214 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 0646243669..6ef214c97f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1615,10 +1615,12 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
+ let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
helper_test_append_new_files_to_table(
FileType::JSON,
FileCompressionType::UNCOMPRESSED,
- None,
+ Some(config_map),
)
.await?;
Ok(())
@@ -1637,10 +1639,12 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_csv_files() -> Result<()> {
+ let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
helper_test_append_new_files_to_table(
FileType::CSV,
FileCompressionType::UNCOMPRESSED,
- None,
+ Some(config_map),
)
.await?;
Ok(())
@@ -1648,10 +1652,12 @@ mod tests {
#[tokio::test]
async fn test_insert_into_append_new_parquet_files_defaults() ->
Result<()> {
+ let mut config_map: HashMap<String, String> = HashMap::new();
+ config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
- None,
+ Some(config_map),
)
.await?;
Ok(())
@@ -1838,6 +1844,7 @@ mod tests {
"datafusion.execution.parquet.write_batch_size".into(),
"5".into(),
);
+ config_map.insert("datafusion.execution.batch_size".into(),
"1".into());
helper_test_append_new_files_to_table(
FileType::PARQUET,
FileCompressionType::UNCOMPRESSED,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9be566f10a..52525d1fc4 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1021,6 +1021,7 @@ fn add_hash_on_top(
// until Repartition(Hash).
dist_onward: &mut Option<ExecTree>,
input_idx: usize,
+ repartition_beneficial_stats: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
if n_target == input.output_partitioning().partition_count() && n_target
== 1 {
// In this case adding a hash repartition is unnecessary as the hash
@@ -1044,9 +1045,13 @@ fn add_hash_on_top(
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
let should_preserve_ordering = input.output_ordering().is_some();
- // Since hashing benefits from partitioning, add a round-robin
repartition
- // before it:
- let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward,
0)?;
+ let mut new_plan = if repartition_beneficial_stats {
+ // Since hashing benefits from partitioning, add a round-robin
repartition
+ // before it:
+ add_roundrobin_on_top(input, n_target, dist_onward, 0)?
+ } else {
+ input
+ };
new_plan = Arc::new(
RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs,
n_target))?
.with_preserve_order(should_preserve_ordering),
@@ -1223,6 +1228,7 @@ fn ensure_distribution(
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
let repartition_file_scans = config.optimizer.repartition_file_scans;
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
+ let batch_size = config.execution.batch_size;
let is_unbounded = unbounded_output(&dist_context.plan);
// Use order preserving variants either of the conditions true
// - it is desired according to config
@@ -1233,13 +1239,7 @@ fn ensure_distribution(
if dist_context.plan.children().is_empty() {
return Ok(Transformed::No(dist_context));
}
- // Don't need to apply when the returned row count is not greater than 1:
- let stats = dist_context.plan.statistics();
- let mut repartition_beneficial_stat = true;
- if stats.is_exact {
- repartition_beneficial_stat =
- stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
- }
+
// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
mut plan,
@@ -1263,7 +1263,6 @@ fn ensure_distribution(
plan = updated_window;
}
};
-
let n_children = plan.children().len();
// This loop iterates over all the children to:
// - Increase parallelism for every child if it is beneficial.
@@ -1289,9 +1288,19 @@ fn ensure_distribution(
maintains,
child_idx,
)| {
+ // Don't need to apply when the returned row count is not greater
than 1:
+ let stats = child.statistics();
+ let repartition_beneficial_stats = if stats.is_exact {
+ stats
+ .num_rows
+ .map(|num_rows| num_rows > batch_size)
+ .unwrap_or(true)
+ } else {
+ true
+ };
if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
- && (would_benefit && repartition_beneficial_stat)
+ && (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count,
it is not beneficial:
&& child.output_partitioning().partition_count() <
target_partitions
{
@@ -1340,6 +1349,7 @@ fn ensure_distribution(
target_partitions,
dist_onward,
child_idx,
+ repartition_beneficial_stats,
)?;
}
Distribution::UnspecifiedDistribution => {}
diff --git a/datafusion/core/tests/sql/order.rs
b/datafusion/core/tests/sql/order.rs
index 0142675bbd..6e3f6319e1 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -209,16 +209,16 @@ ORDER BY 1, 2;
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as
Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2),
input_partitions=2",
- " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as
t], aggr=[]",
- " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0
as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as
Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2),
input_partitions=2",
- " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as
t], aggr=[]",
- " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
+ " AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0
as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index c33f49049d..3c9dabf605 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1021,12 +1021,21 @@ impl ExecutionPlan for AggregateExec {
..Default::default()
}
}
- _ => Statistics {
- // the output row count is surely not larger than its input
row count
- num_rows: self.input.statistics().num_rows,
- is_exact: false,
- ..Default::default()
- },
+ _ => {
+ let input_stats = self.input.statistics();
+ // Input statistics is exact and number of rows not greater
than 1:
+ let is_exact = input_stats.is_exact
+ && (input_stats
+ .num_rows
+ .map(|num_rows| num_rows == 1)
+ .unwrap_or(false));
+ Statistics {
+ // the output row count is surely not larger than its
input row count
+ num_rows: self.input.statistics().num_rows,
+ is_exact,
+ ..Default::default()
+ }
+ }
}
}
}
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index c6f9f42fa2..777b634e93 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -2342,8 +2342,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2397,8 +2397,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)], lim=[4]
---------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)], lim=[4]
----------------MemoryExec: partitions=1, partition_sizes=[1]
query TT
@@ -2416,8 +2416,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MIN(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MIN(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
query TT
@@ -2435,8 +2435,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
query TT
@@ -2454,8 +2454,8 @@ GlobalLimitExec: skip=0, fetch=4
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4),
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id],
aggr=[MAX(traces.timestamp)]
----------------MemoryExec: partitions=1, partition_sizes=[1]
query TI
diff --git a/datafusion/sqllogictest/test_files/copy.slt
b/datafusion/sqllogictest/test_files/copy.slt
index a41d1fca66..f2fe216ee8 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -33,7 +33,7 @@ CopyTo: format=parquet
output_url=test_files/scratch/copy/table single_file_outp
--TableScan: source_table projection=[col1, col2]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
# Error case
query error DataFusion error: Invalid or Unsupported Configuration: Format not
explicitly set and unable to get file extension!
diff --git a/datafusion/sqllogictest/test_files/groupby.slt
b/datafusion/sqllogictest/test_files/groupby.slt
index b7070a8d7e..bf93c6633b 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2024,13 +2024,11 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
----------------CoalesceBatchesExec: target_batch_size=8192
------------------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(col0@0, col0@0)]
--------------------CoalesceBatchesExec: target_batch_size=8192
-----------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=4
-------------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
--------------------CoalesceBatchesExec: target_batch_size=8192
-----------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=4
-------------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4),
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
@@ -2762,9 +2760,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
------------------MemoryExec: partitions=1, partition_sizes=[1]
query TRR
@@ -2799,9 +2797,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
---------------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
------------------MemoryExec: partitions=1, partition_sizes=[1]
query TRR
@@ -2815,6 +2813,11 @@ FRA 50 50
GRC 30 30
TUR 75 75
+# make sure that batch size is small. So that query below runs in multi
partitions
+# row number of the sales_global is 5. Hence we choose batch size 4 to make is
smaller.
+statement ok
+set datafusion.execution.batch_size = 4;
+
# order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
# multi-partitions without group by also.
query TT
@@ -2958,7 +2961,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
--SortExec: expr=[country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount)]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount)]
--------------SortExec: expr=[amount@1 ASC NULLS LAST]
@@ -2994,7 +2997,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
--SortExec: expr=[country@0 ASC NULLS LAST]
----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount)
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts,
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount
DESC NULLS FIRST]@3 as fv2]
------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([country@0], 8),
input_partitions=8
------------AggregateExec: mode=Partial, gby=[country@0 as country],
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount),
LAST_VALUE(sales_global.amount)]
--------------SortExec: expr=[amount@1 DESC]
@@ -3195,7 +3198,7 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
--SortExec: expr=[sn@0 ASC NULLS LAST]
----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS
Int64) as Int64(2) * s.sn]
------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as
amount], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8),
input_partitions=8
------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount],
aggr=[]
--------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0,
0]
@@ -3248,7 +3251,7 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
--SortExec: expr=[sn@0 ASC NULLS LAST]
----ProjectionExec: expr=[sn@0 as sn, SUM(l.amount)@2 as SUM(l.amount),
amount@1 as amount]
------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as
amount], aggr=[SUM(l.amount)]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8),
input_partitions=8
------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount],
aggr=[SUM(l.amount)]
--------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3
as amount]
@@ -3396,7 +3399,7 @@ SortPreservingMergeExec: [sn@2 ASC NULLS LAST]
--SortExec: expr=[sn@2 ASC NULLS LAST]
----ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0
as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as
sum_amount]
------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, zip_code@1 as
zip_code, country@2 as country, ts@3 as ts, currency@4 as currency, amount@5 as
amount, sum_amount@6 as sum_amount], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
----------RepartitionExec: partitioning=Hash([sn@0, zip_code@1, country@2,
ts@3, currency@4, amount@5, sum_amount@6], 8), input_partitions=8
------------AggregateExec: mode=Partial, gby=[sn@2 as sn, zip_code@0 as
zip_code, country@1 as country, ts@3 as ts, currency@4 as currency, amount@5 as
amount, sum_amount@6 as sum_amount], aggr=[]
--------------RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1
@@ -3566,6 +3569,12 @@ ORDER BY y;
2 1
3 1
+# Make sure to choose a batch size smaller than, row number of the table.
+# In this case we choose 2 (Row number of the table is 3).
+# otherwise we won't see parallelism in tests.
+statement ok
+set datafusion.execution.batch_size = 2;
+
# plan of the query above should contain partial
# and final aggregation stages
query TT
@@ -3579,7 +3588,8 @@ physical_plan
AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)]
-------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------MemoryExec: partitions=1, partition_sizes=[1]
query I
SELECT FIRST_VALUE(x)
@@ -3600,7 +3610,8 @@ physical_plan
AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)]
-------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------MemoryExec: partitions=1, partition_sizes=[1]
query TT
@@ -3644,7 +3655,7 @@ Aggregate: groupBy=[[multiple_ordered_table_with_pk.c,
multiple_ordered_table_wi
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b],
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
--SortExec: expr=[c@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b],
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
@@ -3685,7 +3696,7 @@ Aggregate: groupBy=[[multiple_ordered_table_with_pk.c,
multiple_ordered_table_wi
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b],
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
--SortExec: expr=[c@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
--------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b],
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
diff --git a/datafusion/sqllogictest/test_files/join.slt
b/datafusion/sqllogictest/test_files/join.slt
index 283ff57a98..874d849e9a 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -558,6 +558,10 @@ explain select * from t1 join t2 on false;
logical_plan EmptyRelation
physical_plan EmptyExec: produce_one_row=false
+# Make batch size smaller than table row number. to introduce parallelism to
the plan.
+statement ok
+set datafusion.execution.batch_size = 1;
+
# test covert inner join to cross join when condition is true
query TT
explain select * from t1 inner join t2 on true;
@@ -568,9 +572,9 @@ CrossJoin:
--TableScan: t2 projection=[t2_id, t2_name, t2_int]
physical_plan
CrossJoinExec
---CoalescePartitionsExec
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
+--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----MemoryExec: partitions=1, partition_sizes=[1]
statement ok
drop table IF EXISTS t1;
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 01c0131fdb..cc90e64313 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -24,7 +24,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
statement ok
set datafusion.explain.logical_plan_only = true;
@@ -185,6 +185,10 @@ FROM
statement ok
set datafusion.execution.target_partitions = 2;
+# make sure to a batch size smaller than row number of the table.
+statement ok
+set datafusion.execution.batch_size = 2;
+
##########
## Joins Tests
##########
@@ -1311,13 +1315,13 @@ Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[]]
physical_plan
AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[]
--ProjectionExec: expr=[t1_id@0 as t1_id]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1339,13 +1343,13 @@ physical_plan
ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)]
--AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id],
aggr=[COUNT(*)]
----ProjectionExec: expr=[t1_id@0 as t1_id]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([t2_id@0], 2),
input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1372,13 +1376,13 @@ ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT
join_t1.t1_id)]
--------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
------------ProjectionExec: expr=[t1_id@0 as t1_id]
---------------CoalesceBatchesExec: target_batch_size=4096
+--------------CoalesceBatchesExec: target_batch_size=2
----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
t2_id@0)]
-------------------CoalesceBatchesExec: target_batch_size=4096
+------------------CoalesceBatchesExec: target_batch_size=2
--------------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[1]
-------------------CoalesceBatchesExec: target_batch_size=4096
+------------------CoalesceBatchesExec: target_batch_size=2
--------------------RepartitionExec: partitioning=Hash([t2_id@0], 2),
input_partitions=2
----------------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1441,7 +1445,7 @@ Projection: join_t1.t1_id, join_t1.t1_name,
join_t1.t1_int, join_t2.t2_id, join_
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int,
CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@4 as t2_id, t2_name@5 as t2_name, t2_int@6 as t2_int]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id +
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)]
--------CoalescePartitionsExec
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
@@ -1468,14 +1472,14 @@ Projection: join_t1.t1_id, join_t1.t1_name,
join_t1.t1_int, join_t2.t2_id, join_
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int,
CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@4 as t2_id, t2_name@5 as t2_name, t2_int@6 as t2_int]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id +
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([join_t1.t1_id + Int64(11)@3],
2), input_partitions=2
------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([CAST(join_t2.t2_id AS Int64)@3],
2), input_partitions=2
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name,
t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
@@ -1501,7 +1505,7 @@ physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as
t2_id]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name,
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id,
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id +
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
----------CoalescePartitionsExec
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
@@ -1529,14 +1533,14 @@ physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as
t2_id]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name,
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id,
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id +
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1],
2), input_partitions=2
--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as
join_t2.t2_id + UInt32(1)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2],
2), input_partitions=2
--------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
@@ -1562,7 +1566,7 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id +
UInt32(11)@2, t2_id@0)]
--------CoalescePartitionsExec
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
@@ -1589,14 +1593,14 @@ Projection: join_t1.t1_id, join_t2.t2_id,
join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id +
UInt32(11)@2, t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2],
2), input_partitions=2
------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name,
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1622,7 +1626,7 @@ physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as
t2_id]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id -
UInt32(11)@1, t1_id@0)]
----------CoalescePartitionsExec
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
@@ -1650,14 +1654,14 @@ physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as
t2_id]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id -
UInt32(11)@1, t1_id@0)]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1],
2), input_partitions=2
--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as
join_t2.t2_id - UInt32(11)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([t1_id@0], 2),
input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1680,7 +1684,7 @@ Inner Join: join_t1.t1_id = join_t2.t2_id - UInt32(11)
--TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0,
join_t2.t2_id - UInt32(11)@3)]
------MemoryExec: partitions=1, partition_sizes=[1]
------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2
as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
@@ -1703,13 +1707,13 @@ Inner Join: join_t1.t1_id = join_t2.t2_id - UInt32(11)
--TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0,
join_t2.t2_id - UInt32(11)@3)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@3], 2),
input_partitions=2
----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name,
t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
@@ -2032,13 +2036,13 @@ Inner Join: Filter: join_t1.t1_id > join_t2.t2_id
------TableScan: join_t2 projection=[t2_id, t2_int]
physical_plan
NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
----FilterExec: t1_id@0 > 10
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]
--CoalescePartitionsExec
----ProjectionExec: expr=[t2_id@0 as t2_id]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------FilterExec: t2_int@1 > 1
----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2073,11 +2077,11 @@ Right Join: Filter: join_t1.t1_id < join_t2.t2_id
physical_plan
NestedLoopJoinExec: join_type=Right, filter=t1_id@0 < t2_id@1
--CoalescePartitionsExec
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------FilterExec: t1_id@0 > 22
--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
----FilterExec: t2_id@0 > 11
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2643,7 +2647,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
# explain sort_merge_join_on_date32 inner sort merge join on data type (Date32)
query TT
@@ -2658,12 +2662,12 @@ Inner Join: t1.c1 = t2.c1
physical_plan
SortMergeJoin: join_type=Inner, on=[(c1@0, c1@0)]
--SortExec: expr=[c1@0 ASC]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
--SortExec: expr=[c1@0 ASC]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2689,13 +2693,13 @@ physical_plan
ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as
c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]
--SortMergeJoin: join_type=Right, on=[(CAST(t1.c3 AS Decimal128(10, 2))@4,
c3@2)]
----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([CAST(t1.c3 AS Decimal128(10,
2))@4], 2), input_partitions=2
----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as
c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
----SortExec: expr=[c3@2 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([c3@2], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2722,7 +2726,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
@@ -2743,7 +2747,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
query TT
explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id
IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2751,13 +2755,13 @@ explain SELECT t1_id, t1_name FROM
left_semi_anti_join_table_t1 t1 WHERE t1_id I
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0,
t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2792,13 +2796,13 @@ explain SELECT t1_id, t1_name FROM
left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0,
t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2827,7 +2831,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
#Test the left_semi_join scenarios where the current repartition_joins
parameter is set to false .
####
@@ -2846,7 +2850,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
query TT
explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id
IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2854,7 +2858,7 @@ explain SELECT t1_id, t1_name FROM
left_semi_anti_join_table_t1 t1 WHERE t1_id I
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0,
t2_id@0)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -2890,7 +2894,7 @@ explain SELECT t1_id, t1_name FROM
left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0,
t2_id@0)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -2920,7 +2924,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
#Test the right_semi_join scenarios where the current repartition_joins
parameter is set to true .
@@ -2940,7 +2944,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
query TT
explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1
WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id =
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -2948,13 +2952,13 @@ explain SELECT t1_id, t1_name, t1_int FROM
right_semi_anti_join_table_t1 t1 WHER
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@1 != t1_name@0
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2970,13 +2974,13 @@ explain SELECT t1_id, t1_name, t1_int FROM
right_semi_anti_join_table_t2 t2 RIGH
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@0 != t1_name@1
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3002,7 +3006,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
#Test the right_semi_join scenarios where the current repartition_joins
parameter is set to false .
@@ -3022,7 +3026,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
query TT
explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1
WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id =
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3030,7 +3034,7 @@ explain SELECT t1_id, t1_name, t1_int FROM
right_semi_anti_join_table_t1 t1 WHER
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@1 != t1_name@0
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -3047,7 +3051,7 @@ explain SELECT t1_id, t1_name, t1_int FROM
right_semi_anti_join_table_t2 t2 RIGH
physical_plan
SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
--SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0,
t1_id@0)], filter=t2_name@0 != t1_name@1
--------MemoryExec: partitions=1, partition_sizes=[1]
--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -3074,7 +3078,7 @@ statement ok
set datafusion.execution.target_partitions = 2;
statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
####
@@ -3126,14 +3130,14 @@ physical_plan
SortPreservingMergeExec: [rn1@5 ASC NULLS LAST]
--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
----SortExec: expr=[rn1@5 ASC NULLS LAST]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c,
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@5 as rn1]
--------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
----SortExec: expr=[a@1 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
@@ -3162,12 +3166,12 @@ physical_plan
SortPreservingMergeExec: [rn1@10 ASC NULLS LAST]
--SortMergeJoin: join_type=Right, on=[(a@1, a@1)]
----SortExec: expr=[a@1 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
----SortExec: expr=[rn1@5 ASC NULLS LAST]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c,
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@5 as rn1]
@@ -3203,14 +3207,14 @@ SortPreservingMergeExec: [a@1 ASC,b@2 ASC NULLS
LAST,c@3 ASC NULLS LAST,rn1@11 A
--SortExec: expr=[a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 ASC
NULLS LAST]
----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
------SortExec: expr=[a@1 ASC]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c,
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@5 as rn1]
----------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound:
Following(UInt64(NULL)) }], mode=[Sorted]
------------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
------SortExec: expr=[a@1 ASC]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c,
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@5 as rn1]
@@ -3245,7 +3249,7 @@ Sort: r_table.rn1 ASC NULLS LAST
--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
----------TableScan: annotated_data projection=[a0, a, b, c, d]
physical_plan
-CoalesceBatchesExec: target_batch_size=4096
+CoalesceBatchesExec: target_batch_size=2
--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@1, a@1)]
----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
has_header=true
----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
@@ -3272,7 +3276,7 @@ Sort: r_table.rn1 ASC NULLS LAST
--------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING]]
----------TableScan: annotated_data projection=[a0, a, b, c, d]
physical_plan
-CoalesceBatchesExec: target_batch_size=4096
+CoalesceBatchesExec: target_batch_size=2
--HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(a@0, a@1)]
----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a],
output_ordering=[a@0 ASC], has_header=true
----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
diff --git a/datafusion/sqllogictest/test_files/options.slt
b/datafusion/sqllogictest/test_files/options.slt
index 5fbb2102f4..83fe85745e 100644
--- a/datafusion/sqllogictest/test_files/options.slt
+++ b/datafusion/sqllogictest/test_files/options.slt
@@ -33,7 +33,7 @@ Filter: a.c0 < Int32(1)
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: c0@0 < 1
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----MemoryExec: partitions=1, partition_sizes=[1]
##
# test_disable_coalesce
@@ -51,7 +51,7 @@ Filter: a.c0 < Int32(1)
--TableScan: a projection=[c0]
physical_plan
FilterExec: c0@0 < 1
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
statement ok
set datafusion.execution.coalesce_batches = true
@@ -74,7 +74,7 @@ Filter: a.c0 < Int32(1)
physical_plan
CoalesceBatchesExec: target_batch_size=1234
--FilterExec: c0@0 < 1
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----MemoryExec: partitions=1, partition_sizes=[1]
statement ok
diff --git a/datafusion/sqllogictest/test_files/select.slt
b/datafusion/sqllogictest/test_files/select.slt
index b099107358..1d42747976 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -485,8 +485,7 @@ Projection: select_between_data.c1 >= Int64(2) AND
select_between_data.c1 <= Int
--TableScan: select_between_data projection=[c1]
physical_plan
ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as select_between_data.c1
BETWEEN Int64(2) AND Int64(3)]
---RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-----MemoryExec: partitions=1, partition_sizes=[1]
+--MemoryExec: partitions=1, partition_sizes=[1]
# TODO: query_get_indexed_field
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 2eccb60aad..7cbb848f33 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -15,6 +15,10 @@
# specific language governing permissions and limitations
# under the License.
+# make sure to a batch size smaller than row number of the table.
+statement ok
+set datafusion.execution.batch_size = 2;
+
#############
## Subquery Tests
#############
@@ -178,15 +182,15 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS
t2_sum
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int),
t2_id@1 as t2_id]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as
t2_id]
----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
-------------CoalesceBatchesExec: target_batch_size=8192
+------------CoalesceBatchesExec: target_batch_size=2
--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
@@ -213,15 +217,15 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int *
Float64(1)) + Int64(1) AS t2
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) +
Int64(1)@1 as t2_sum]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) +
Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as
SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int * Float64(1))]
-------------CoalesceBatchesExec: target_batch_size=8192
+------------CoalesceBatchesExec: target_batch_size=2
--------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int * Float64(1))]
------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
@@ -247,16 +251,16 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS
t2_sum
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
+--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------CoalesceBatchesExec: target_batch_size=8192
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t2_id@1], 4), input_partitions=4
----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as
t2_id]
------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id,
Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)]
---------------CoalesceBatchesExec: target_batch_size=8192
+--------------CoalesceBatchesExec: target_batch_size=2
----------------RepartitionExec: partitioning=Hash([t2_id@0, Utf8("a")@1], 4),
input_partitions=4
------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as
Utf8("a")], aggr=[SUM(t2.t2_int)]
--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
@@ -285,17 +289,17 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS
t2_sum
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int),
t2_id@1 as t2_id]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as
t2_id]
-----------CoalesceBatchesExec: target_batch_size=8192
+----------CoalesceBatchesExec: target_batch_size=2
------------FilterExec: SUM(t2.t2_int)@1 < 3
--------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
-----------------CoalesceBatchesExec: target_batch_size=8192
+----------------CoalesceBatchesExec: target_batch_size=2
------------------RepartitionExec: partitioning=Hash([t2_id@0], 4),
input_partitions=4
--------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id],
aggr=[SUM(t2.t2_int)]
----------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
diff --git a/datafusion/sqllogictest/test_files/tpch/q15.slt.part
b/datafusion/sqllogictest/test_files/tpch/q15.slt.part
index 4515b8ae1f..a872e96acf 100644
--- a/datafusion/sqllogictest/test_files/tpch/q15.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q15.slt.part
@@ -95,20 +95,19 @@ SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST]
----------------------------------FilterExec: l_shipdate@3 >= 9496 AND
l_shipdate@3 < 9587
------------------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_discou [...]
----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec:
partitioning=Hash([MAX(revenue0.total_revenue)@0], 4), input_partitions=4
---------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
-----------------AggregateExec: mode=Final, gby=[],
aggr=[MAX(revenue0.total_revenue)]
-------------------CoalescePartitionsExec
---------------------AggregateExec: mode=Partial, gby=[],
aggr=[MAX(revenue0.total_revenue)]
-----------------------ProjectionExec: expr=[SUM(lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount)@1 as total_revenue]
-------------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0
as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
---------------------------CoalesceBatchesExec: target_batch_size=8192
-----------------------------RepartitionExec: partitioning=Hash([l_suppkey@0],
4), input_partitions=4
-------------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as
l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
---------------------------------ProjectionExec: expr=[l_suppkey@0 as
l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]
-----------------------------------CoalesceBatchesExec: target_batch_size=8192
-------------------------------------FilterExec: l_shipdate@3 >= 9496 AND
l_shipdate@3 < 9587
---------------------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_disc [...]
+------------RepartitionExec:
partitioning=Hash([MAX(revenue0.total_revenue)@0], 4), input_partitions=1
+--------------AggregateExec: mode=Final, gby=[],
aggr=[MAX(revenue0.total_revenue)]
+----------------CoalescePartitionsExec
+------------------AggregateExec: mode=Partial, gby=[],
aggr=[MAX(revenue0.total_revenue)]
+--------------------ProjectionExec: expr=[SUM(lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount)@1 as total_revenue]
+----------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0
as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
+------------------------CoalesceBatchesExec: target_batch_size=8192
+--------------------------RepartitionExec: partitioning=Hash([l_suppkey@0],
4), input_partitions=4
+----------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as
l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) -
lineitem.l_discount)]
+------------------------------ProjectionExec: expr=[l_suppkey@0 as l_suppkey,
l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]
+--------------------------------CoalesceBatchesExec: target_batch_size=8192
+----------------------------------FilterExec: l_shipdate@3 >= 9496 AND
l_shipdate@3 < 9587
+------------------------------------CsvExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_suppkey, l_extendedprice, l_discou [...]
query ITTTR
with revenue0 (supplier_no, total_revenue) as (
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index cbb1896efb..688774c906 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -200,13 +200,14 @@ Aggregate: groupBy=[[t1.name]], aggr=[[]]
physical_plan
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
--CoalesceBatchesExec: target_batch_size=8192
-----RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=12
-------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------UnionExec
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-----------ProjectionExec: expr=[name@0 || _new as name]
-------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
+--------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+----------UnionExec
+------------MemoryExec: partitions=1, partition_sizes=[1]
+------------MemoryExec: partitions=1, partition_sizes=[1]
+------------ProjectionExec: expr=[name@0 || _new as name]
+--------------MemoryExec: partitions=1, partition_sizes=[1]
# nested_union_all
query T rowsort
@@ -234,11 +235,14 @@ Union
----TableScan: t2 projection=[name]
physical_plan
UnionExec
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
+--MemoryExec: partitions=1, partition_sizes=[1]
--ProjectionExec: expr=[name@0 || _new as name]
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----MemoryExec: partitions=1, partition_sizes=[1]
+# Make sure to choose a small batch size to introduce parallelism to the plan.
+statement ok
+set datafusion.execution.batch_size = 2;
# union_with_type_coercion
query TT
@@ -269,32 +273,36 @@ Union
physical_plan
UnionExec
--ProjectionExec: expr=[id@0 as id, name@1 as name]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(id@0,
CAST(t2.id AS Int32)@2), (name@1, name@1)]
--------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as
name], aggr=[]
-----------CoalesceBatchesExec: target_batch_size=8192
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
--------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name],
aggr=[]
-----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+----------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, name@1],
4), input_partitions=4
------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS
Int32) as CAST(t2.id AS Int32)]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
--ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
----ProjectionExec: expr=[id@0 as id, name@1 as name]
-------CoalesceBatchesExec: target_batch_size=8192
+------CoalesceBatchesExec: target_batch_size=2
--------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(CAST(t2.id AS
Int32)@2, id@0), (name@1, name@1)]
-----------CoalesceBatchesExec: target_batch_size=8192
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2,
name@1], 4), input_partitions=4
--------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS
Int32) as CAST(t2.id AS Int32)]
----------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1
as name], aggr=[]
-------------------CoalesceBatchesExec: target_batch_size=8192
+------------------CoalesceBatchesExec: target_batch_size=2
--------------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
----------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as
name], aggr=[]
-------------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-----------CoalesceBatchesExec: target_batch_size=8192
+------------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+--------------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------CoalesceBatchesExec: target_batch_size=2
------------RepartitionExec: partitioning=Hash([id@0, name@1], 4),
input_partitions=4
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
query IT rowsort
(
@@ -339,26 +347,30 @@ Union
----TableScan: t1 projection=[name]
physical_plan
InterleaveExec
---CoalesceBatchesExec: target_batch_size=8192
+--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)]
------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+--CoalesceBatchesExec: target_batch_size=2
----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)]
------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
+------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
# union_upcast_types
query TT
@@ -416,15 +428,17 @@ ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)]
--AggregateExec: mode=SinglePartitioned, gby=[name@0 as name], aggr=[COUNT(*)]
----InterleaveExec
------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
########
@@ -530,11 +544,10 @@ physical_plan
UnionExec
--ProjectionExec: expr=[Int64(1)@0 as a]
----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=4
+------CoalesceBatchesExec: target_batch_size=2
+--------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1
----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
-------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
---------------EmptyExec: produce_one_row=true
+------------EmptyExec: produce_one_row=true
--ProjectionExec: expr=[2 as a]
----EmptyExec: produce_one_row=true
--ProjectionExec: expr=[3 as a]
@@ -562,16 +575,12 @@ physical_plan
UnionExec
--ProjectionExec: expr=[COUNT(*)@1 as count, n@0 as n]
----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=4
+------CoalesceBatchesExec: target_batch_size=2
+--------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1
----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
-------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
---------------ProjectionExec: expr=[5 as n]
-----------------EmptyExec: produce_one_row=true
+------------ProjectionExec: expr=[5 as n]
+--------------EmptyExec: produce_one_row=true
--ProjectionExec: expr=[x@0 as count, y@1 as n]
----ProjectionExec: expr=[1 as x, MAX(Int64(10))@0 as y]
-------AggregateExec: mode=Final, gby=[], aggr=[MAX(Int64(10))]
---------CoalescePartitionsExec
-----------AggregateExec: mode=Partial, gby=[], aggr=[MAX(Int64(10))]
-------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
---------------EmptyExec: produce_one_row=true
+------AggregateExec: mode=Single, gby=[], aggr=[MAX(Int64(10))]
+--------EmptyExec: produce_one_row=true