This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9aacdee138 [MINOR]:Do not introduce unnecessary repartition when row 
count is 1. (#7832)
9aacdee138 is described below

commit 9aacdee13881f25335d8ea323311949b9799e6d3
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Oct 17 11:18:44 2023 +0300

    [MINOR]:Do not introduce unnecessary repartition when row count is 1. 
(#7832)
    
    * Initial commit
    
    * Fix failing tests
    
    * More idiomatic expressions
    
    * Update tests, use batch size during partition benefit check
    
    * Fix failing tests
    
    * is_exact when row count is 1
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/core/src/datasource/listing/table.rs    |  13 +-
 .../src/physical_optimizer/enforce_distribution.rs |  34 +++--
 datafusion/core/tests/sql/order.rs                 |   8 +-
 datafusion/physical-plan/src/aggregates/mod.rs     |  21 ++-
 datafusion/sqllogictest/test_files/aggregate.slt   |  20 +--
 datafusion/sqllogictest/test_files/copy.slt        |   2 +-
 datafusion/sqllogictest/test_files/groupby.slt     |  53 +++++---
 datafusion/sqllogictest/test_files/join.slt        |  10 +-
 datafusion/sqllogictest/test_files/joins.slt       | 148 +++++++++++----------
 datafusion/sqllogictest/test_files/options.slt     |   6 +-
 datafusion/sqllogictest/test_files/select.slt      |   3 +-
 datafusion/sqllogictest/test_files/subquery.slt    |  32 +++--
 .../sqllogictest/test_files/tpch/q15.slt.part      |  27 ++--
 datafusion/sqllogictest/test_files/union.slt       | 107 ++++++++-------
 14 files changed, 270 insertions(+), 214 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 0646243669..6ef214c97f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1615,10 +1615,12 @@ mod tests {
 
     #[tokio::test]
     async fn test_insert_into_append_new_json_files() -> Result<()> {
+        let mut config_map: HashMap<String, String> = HashMap::new();
+        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
         helper_test_append_new_files_to_table(
             FileType::JSON,
             FileCompressionType::UNCOMPRESSED,
-            None,
+            Some(config_map),
         )
         .await?;
         Ok(())
@@ -1637,10 +1639,12 @@ mod tests {
 
     #[tokio::test]
     async fn test_insert_into_append_new_csv_files() -> Result<()> {
+        let mut config_map: HashMap<String, String> = HashMap::new();
+        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
         helper_test_append_new_files_to_table(
             FileType::CSV,
             FileCompressionType::UNCOMPRESSED,
-            None,
+            Some(config_map),
         )
         .await?;
         Ok(())
@@ -1648,10 +1652,12 @@ mod tests {
 
     #[tokio::test]
     async fn test_insert_into_append_new_parquet_files_defaults() -> 
Result<()> {
+        let mut config_map: HashMap<String, String> = HashMap::new();
+        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
         helper_test_append_new_files_to_table(
             FileType::PARQUET,
             FileCompressionType::UNCOMPRESSED,
-            None,
+            Some(config_map),
         )
         .await?;
         Ok(())
@@ -1838,6 +1844,7 @@ mod tests {
             "datafusion.execution.parquet.write_batch_size".into(),
             "5".into(),
         );
+        config_map.insert("datafusion.execution.batch_size".into(), 
"1".into());
         helper_test_append_new_files_to_table(
             FileType::PARQUET,
             FileCompressionType::UNCOMPRESSED,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 9be566f10a..52525d1fc4 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1021,6 +1021,7 @@ fn add_hash_on_top(
     // until Repartition(Hash).
     dist_onward: &mut Option<ExecTree>,
     input_idx: usize,
+    repartition_beneficial_stats: bool,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     if n_target == input.output_partitioning().partition_count() && n_target 
== 1 {
         // In this case adding a hash repartition is unnecessary as the hash
@@ -1044,9 +1045,13 @@ fn add_hash_on_top(
         // - Usage of order preserving variants is not desirable (per the flag
         //   `config.optimizer.bounded_order_preserving_variants`).
         let should_preserve_ordering = input.output_ordering().is_some();
-        // Since hashing benefits from partitioning, add a round-robin 
repartition
-        // before it:
-        let mut new_plan = add_roundrobin_on_top(input, n_target, dist_onward, 
0)?;
+        let mut new_plan = if repartition_beneficial_stats {
+            // Since hashing benefits from partitioning, add a round-robin 
repartition
+            // before it:
+            add_roundrobin_on_top(input, n_target, dist_onward, 0)?
+        } else {
+            input
+        };
         new_plan = Arc::new(
             RepartitionExec::try_new(new_plan, Partitioning::Hash(hash_exprs, 
n_target))?
                 .with_preserve_order(should_preserve_ordering),
@@ -1223,6 +1228,7 @@ fn ensure_distribution(
     let enable_round_robin = config.optimizer.enable_round_robin_repartition;
     let repartition_file_scans = config.optimizer.repartition_file_scans;
     let repartition_file_min_size = config.optimizer.repartition_file_min_size;
+    let batch_size = config.execution.batch_size;
     let is_unbounded = unbounded_output(&dist_context.plan);
     // Use order preserving variants either of the conditions true
     // - it is desired according to config
@@ -1233,13 +1239,7 @@ fn ensure_distribution(
     if dist_context.plan.children().is_empty() {
         return Ok(Transformed::No(dist_context));
     }
-    // Don't need to apply when the returned row count is not greater than 1:
-    let stats = dist_context.plan.statistics();
-    let mut repartition_beneficial_stat = true;
-    if stats.is_exact {
-        repartition_beneficial_stat =
-            stats.num_rows.map(|num_rows| num_rows > 1).unwrap_or(true);
-    }
+
     // Remove unnecessary repartition from the physical plan if any
     let DistributionContext {
         mut plan,
@@ -1263,7 +1263,6 @@ fn ensure_distribution(
             plan = updated_window;
         }
     };
-
     let n_children = plan.children().len();
     // This loop iterates over all the children to:
     // - Increase parallelism for every child if it is beneficial.
@@ -1289,9 +1288,19 @@ fn ensure_distribution(
             maintains,
             child_idx,
         )| {
+            // Don't need to apply when the returned row count is not greater 
than 1:
+            let stats = child.statistics();
+            let repartition_beneficial_stats = if stats.is_exact {
+                stats
+                    .num_rows
+                    .map(|num_rows| num_rows > batch_size)
+                    .unwrap_or(true)
+            } else {
+                true
+            };
             if enable_round_robin
                 // Operator benefits from partitioning (e.g. filter):
-                && (would_benefit && repartition_beneficial_stat)
+                && (would_benefit && repartition_beneficial_stats)
                 // Unless partitioning doesn't increase the partition count, 
it is not beneficial:
                 && child.output_partitioning().partition_count() < 
target_partitions
             {
@@ -1340,6 +1349,7 @@ fn ensure_distribution(
                         target_partitions,
                         dist_onward,
                         child_idx,
+                        repartition_beneficial_stats,
                     )?;
                 }
                 Distribution::UnspecifiedDistribution => {}
diff --git a/datafusion/core/tests/sql/order.rs 
b/datafusion/core/tests/sql/order.rs
index 0142675bbd..6e3f6319e1 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -209,16 +209,16 @@ ORDER BY 1, 2;
         "        AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as 
Int64(0), t@1 as t], aggr=[]",
         "          CoalesceBatchesExec: target_batch_size=8192",
         "            RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), 
input_partitions=2",
-        "              AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as 
t], aggr=[]",
-        "                RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "                AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 
as t], aggr=[]",
         "                  ProjectionExec: expr=[column1@0 as t]",
         "                    ValuesExec",
         "      ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
         "        AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as 
Int64(1), t@1 as t], aggr=[]",
         "          CoalesceBatchesExec: target_batch_size=8192",
         "            RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), 
input_partitions=2",
-        "              AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as 
t], aggr=[]",
-        "                RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "              RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
+        "                AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 
as t], aggr=[]",
         "                  ProjectionExec: expr=[column1@0 as t]",
         "                    ValuesExec",
     ];
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index c33f49049d..3c9dabf605 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1021,12 +1021,21 @@ impl ExecutionPlan for AggregateExec {
                     ..Default::default()
                 }
             }
-            _ => Statistics {
-                // the output row count is surely not larger than its input 
row count
-                num_rows: self.input.statistics().num_rows,
-                is_exact: false,
-                ..Default::default()
-            },
+            _ => {
+                let input_stats = self.input.statistics();
+                // Input statistics is exact and number of rows not greater 
than 1:
+                let is_exact = input_stats.is_exact
+                    && (input_stats
+                        .num_rows
+                        .map(|num_rows| num_rows == 1)
+                        .unwrap_or(false));
+                Statistics {
+                    // the output row count is surely not larger than its 
input row count
+                    num_rows: self.input.statistics().num_rows,
+                    is_exact,
+                    ..Default::default()
+                }
+            }
         }
     }
 }
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index c6f9f42fa2..777b634e93 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -2342,8 +2342,8 @@ GlobalLimitExec: skip=0, fetch=4
 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 
@@ -2397,8 +2397,8 @@ GlobalLimitExec: skip=0, fetch=4
 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)], lim=[4]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)], lim=[4]
---------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)], lim=[4]
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TT
@@ -2416,8 +2416,8 @@ GlobalLimitExec: skip=0, fetch=4
 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], 
aggr=[MIN(traces.timestamp)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MIN(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MIN(traces.timestamp)]
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TT
@@ -2435,8 +2435,8 @@ GlobalLimitExec: skip=0, fetch=4
 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TT
@@ -2454,8 +2454,8 @@ GlobalLimitExec: skip=0, fetch=4
 ------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([trace_id@0], 4), 
input_partitions=4
-------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
---------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], 
aggr=[MAX(traces.timestamp)]
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TI
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index a41d1fca66..f2fe216ee8 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -33,7 +33,7 @@ CopyTo: format=parquet 
output_url=test_files/scratch/copy/table single_file_outp
 --TableScan: source_table projection=[col1, col2]
 physical_plan
 InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
 
 # Error case
 query error DataFusion error: Invalid or Unsupported Configuration: Format not 
explicitly set and unable to get file extension!
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index b7070a8d7e..bf93c6633b 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2024,13 +2024,11 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
 ----------------CoalesceBatchesExec: target_batch_size=8192
 ------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(col0@0, col0@0)]
 --------------------CoalesceBatchesExec: target_batch_size=8192
-----------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=4
-------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
 --------------------CoalesceBatchesExec: target_batch_size=8192
-----------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=4
-------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
---------------------------MemoryExec: partitions=1, partition_sizes=[3]
+----------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=1
+------------------------MemoryExec: partitions=1, partition_sizes=[3]
 
 # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
 # a,b,c column. Column a has cardinality 2, column b has cardinality 4.
@@ -2762,9 +2760,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
 ------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRR
@@ -2799,9 +2797,9 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
 --------CoalesceBatchesExec: target_batch_size=8192
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
-------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
---------------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
+------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
+--------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount)]
+----------------SortExec: expr=[ts@1 ASC NULLS LAST]
 ------------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query TRR
@@ -2815,6 +2813,11 @@ FRA 50 50
 GRC 30 30
 TUR 75 75
 
+# make sure that batch size is small. So that query below runs in multi 
partitions
+# row number of the sales_global is 5. Hence we choose batch size 4 to make is 
smaller.
+statement ok
+set datafusion.execution.batch_size = 4;
+
 # order-sensitive FIRST_VALUE and LAST_VALUE aggregators should work in
 # multi-partitions without group by also.
 query TT
@@ -2958,7 +2961,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount ASC NULLS LAST]@1 as array_agg1]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount)]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
 ------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount)]
 --------------SortExec: expr=[amount@1 ASC NULLS LAST]
@@ -2994,7 +2997,7 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) 
ORDER BY [sales_global.amount DESC NULLS FIRST]@1 as amounts, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS 
LAST]@2 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.amount 
DESC NULLS FIRST]@3 as fv2]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
 ------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[ARRAY_AGG(sales_global.amount), LAST_VALUE(sales_global.amount), 
LAST_VALUE(sales_global.amount)]
 --------------SortExec: expr=[amount@1 DESC]
@@ -3195,7 +3198,7 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
 --SortExec: expr=[sn@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[sn@0 as sn, amount@1 as amount, 2 * CAST(sn@0 AS 
Int64) as Int64(2) * s.sn]
 ------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as 
amount], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), 
input_partitions=8
 ------------AggregateExec: mode=Partial, gby=[sn@0 as sn, amount@1 as amount], 
aggr=[]
 --------------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 
0]
@@ -3248,7 +3251,7 @@ SortPreservingMergeExec: [sn@0 ASC NULLS LAST]
 --SortExec: expr=[sn@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[sn@0 as sn, SUM(l.amount)@2 as SUM(l.amount), 
amount@1 as amount]
 ------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, amount@1 as 
amount], aggr=[SUM(l.amount)]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([sn@0, amount@1], 8), 
input_partitions=8
 ------------AggregateExec: mode=Partial, gby=[sn@1 as sn, amount@2 as amount], 
aggr=[SUM(l.amount)]
 --------------ProjectionExec: expr=[amount@1 as amount, sn@2 as sn, amount@3 
as amount]
@@ -3396,7 +3399,7 @@ SortPreservingMergeExec: [sn@2 ASC NULLS LAST]
 --SortExec: expr=[sn@2 ASC NULLS LAST]
 ----ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 
as sn, ts@3 as ts, currency@4 as currency, amount@5 as amount, sum_amount@6 as 
sum_amount]
 ------AggregateExec: mode=FinalPartitioned, gby=[sn@0 as sn, zip_code@1 as 
zip_code, country@2 as country, ts@3 as ts, currency@4 as currency, amount@5 as 
amount, sum_amount@6 as sum_amount], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=4
 ----------RepartitionExec: partitioning=Hash([sn@0, zip_code@1, country@2, 
ts@3, currency@4, amount@5, sum_amount@6], 8), input_partitions=8
 ------------AggregateExec: mode=Partial, gby=[sn@2 as sn, zip_code@0 as 
zip_code, country@1 as country, ts@3 as ts, currency@4 as currency, amount@5 as 
amount, sum_amount@6 as sum_amount], aggr=[]
 --------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
@@ -3566,6 +3569,12 @@ ORDER BY y;
 2 1
 3 1
 
+# Make sure to choose a batch size smaller than, row number of the table.
+# In this case we choose 2 (Row number of the table is 3).
+# otherwise we won't see parallelism in tests.
+statement ok
+set datafusion.execution.batch_size = 2;
+
 # plan of the query above should contain partial
 # and final aggregation stages
 query TT
@@ -3579,7 +3588,8 @@ physical_plan
 AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)]
 --CoalescePartitionsExec
 ----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)]
-------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------MemoryExec: partitions=1, partition_sizes=[1]
 
 query I
 SELECT FIRST_VALUE(x)
@@ -3600,7 +3610,8 @@ physical_plan
 AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)]
 --CoalescePartitionsExec
 ----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)]
-------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+--------MemoryExec: partitions=1, partition_sizes=[1]
 
 
 query TT
@@ -3644,7 +3655,7 @@ Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_wi
 physical_plan
 AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
 --SortExec: expr=[c@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
 --------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
 ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
@@ -3685,7 +3696,7 @@ Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_wi
 physical_plan
 AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
 --SortExec: expr=[c@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([c@0, b@1], 8), input_partitions=8
 --------AggregateExec: mode=Partial, gby=[c@1 as c, b@0 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallyOrdered
 ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
diff --git a/datafusion/sqllogictest/test_files/join.slt 
b/datafusion/sqllogictest/test_files/join.slt
index 283ff57a98..874d849e9a 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -558,6 +558,10 @@ explain select * from t1 join t2 on false;
 logical_plan EmptyRelation
 physical_plan EmptyExec: produce_one_row=false
 
+# Make batch size smaller than table row number. to introduce parallelism to 
the plan.
+statement ok
+set datafusion.execution.batch_size = 1;
+
 # test covert inner join to cross join when condition is true
 query TT
 explain select * from t1 inner join t2 on true;
@@ -568,9 +572,9 @@ CrossJoin:
 --TableScan: t2 projection=[t2_id, t2_name, t2_int]
 physical_plan
 CrossJoinExec
---CoalescePartitionsExec
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
+--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+----MemoryExec: partitions=1, partition_sizes=[1]
 
 statement ok
 drop table IF EXISTS t1;
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 01c0131fdb..cc90e64313 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -24,7 +24,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 statement ok
 set datafusion.explain.logical_plan_only = true;
@@ -185,6 +185,10 @@ FROM
 statement ok
 set datafusion.execution.target_partitions = 2;
 
+# make sure to a batch size smaller than row number of the table.
+statement ok
+set datafusion.execution.batch_size = 2;
+
 ##########
 ## Joins Tests
 ##########
@@ -1311,13 +1315,13 @@ Aggregate: groupBy=[[join_t1.t1_id]], aggr=[[]]
 physical_plan
 AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], aggr=[]
 --ProjectionExec: expr=[t1_id@0 as t1_id]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1339,13 +1343,13 @@ physical_plan
 ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)]
 --AggregateExec: mode=SinglePartitioned, gby=[t1_id@0 as t1_id], 
aggr=[COUNT(*)]
 ----ProjectionExec: expr=[t1_id@0 as t1_id]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([t2_id@0], 2), 
input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1372,13 +1376,13 @@ ProjectionExec: expr=[COUNT(alias1)@0 as COUNT(DISTINCT 
join_t1.t1_id)]
 --------AggregateExec: mode=FinalPartitioned, gby=[alias1@0 as alias1], aggr=[]
 ----------AggregateExec: mode=Partial, gby=[t1_id@0 as alias1], aggr=[]
 ------------ProjectionExec: expr=[t1_id@0 as t1_id]
---------------CoalesceBatchesExec: target_batch_size=4096
+--------------CoalesceBatchesExec: target_batch_size=2
 ----------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
t2_id@0)]
-------------------CoalesceBatchesExec: target_batch_size=4096
+------------------CoalesceBatchesExec: target_batch_size=2
 --------------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------------MemoryExec: partitions=1, partition_sizes=[1]
-------------------CoalesceBatchesExec: target_batch_size=4096
+------------------CoalesceBatchesExec: target_batch_size=2
 --------------------RepartitionExec: partitioning=Hash([t2_id@0], 2), 
input_partitions=2
 ----------------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1441,7 +1445,7 @@ Projection: join_t1.t1_id, join_t1.t1_name, 
join_t1.t1_int, join_t2.t2_id, join_
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, 
CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@4 as t2_id, t2_name@5 as t2_name, t2_int@6 as t2_int]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + 
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)]
 --------CoalescePartitionsExec
 ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
@@ -1468,14 +1472,14 @@ Projection: join_t1.t1_id, join_t1.t1_name, 
join_t1.t1_int, join_t2.t2_id, join_
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int, 
CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@4 as t2_id, t2_name@5 as t2_name, t2_int@6 as t2_int]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + 
Int64(11)@3, CAST(join_t2.t2_id AS Int64)@3)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([join_t1.t1_id + Int64(11)@3], 
2), input_partitions=2
 ------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_int@2 as t1_int, CAST(t1_id@0 AS Int64) + 11 as join_t1.t1_id + Int64(11)]
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([CAST(join_t2.t2_id AS Int64)@3], 
2), input_partitions=2
 ------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, 
t2_int@2 as t2_int, CAST(t2_id@0 AS Int64) as CAST(join_t2.t2_id AS Int64)]
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
@@ -1501,7 +1505,7 @@ physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as 
t2_id]
 ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, 
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, 
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + 
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
 ----------CoalescePartitionsExec
 ------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
@@ -1529,14 +1533,14 @@ physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as 
t2_id]
 ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, 
join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, 
join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + 
UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 
2), input_partitions=2
 --------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as 
join_t2.t2_id + UInt32(1)]
 ----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------MemoryExec: partitions=1, partition_sizes=[1]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 
2), input_partitions=2
 --------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
 ----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
@@ -1562,7 +1566,7 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as 
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + 
UInt32(11)@2, t2_id@0)]
 --------CoalescePartitionsExec
 ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
@@ -1589,14 +1593,14 @@ Projection: join_t1.t1_id, join_t2.t2_id, 
join_t1.t1_name
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as 
t2_id]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + 
UInt32(11)@2, t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 
2), input_partitions=2
 ------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, 
t1_id@0 + 11 as join_t1.t1_id + UInt32(11)]
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1622,7 +1626,7 @@ physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as 
t2_id]
 ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as 
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - 
UInt32(11)@1, t1_id@0)]
 ----------CoalescePartitionsExec
 ------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
@@ -1650,14 +1654,14 @@ physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
 --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as 
t2_id]
 ----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as 
t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - 
UInt32(11)@1, t1_id@0)]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 
2), input_partitions=2
 --------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as 
join_t2.t2_id - UInt32(11)]
 ----------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ------------------MemoryExec: partitions=1, partition_sizes=[1]
-----------CoalesceBatchesExec: target_batch_size=4096
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([t1_id@0], 2), 
input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 ----------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -1680,7 +1684,7 @@ Inner Join: join_t1.t1_id = join_t2.t2_id - UInt32(11)
 --TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
 ----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, 
join_t2.t2_id - UInt32(11)@3)]
 ------MemoryExec: partitions=1, partition_sizes=[1]
 ------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, t2_int@2 
as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
@@ -1703,13 +1707,13 @@ Inner Join: join_t1.t1_id = join_t2.t2_id - UInt32(11)
 --TableScan: join_t2 projection=[t2_id, t2_name, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_int@2 as 
t1_int, t2_id@3 as t2_id, t2_name@4 as t2_name, t2_int@5 as t2_int]
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
 ----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, 
join_t2.t2_id - UInt32(11)@3)]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------MemoryExec: partitions=1, partition_sizes=[1]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@3], 2), 
input_partitions=2
 ----------ProjectionExec: expr=[t2_id@0 as t2_id, t2_name@1 as t2_name, 
t2_int@2 as t2_int, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
@@ -2032,13 +2036,13 @@ Inner Join:  Filter: join_t1.t1_id > join_t2.t2_id
 ------TableScan: join_t2 projection=[t2_id, t2_int]
 physical_plan
 NestedLoopJoinExec: join_type=Inner, filter=t1_id@0 > t2_id@1
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
 ----FilterExec: t1_id@0 > 10
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 --------MemoryExec: partitions=1, partition_sizes=[1]
 --CoalescePartitionsExec
 ----ProjectionExec: expr=[t2_id@0 as t2_id]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------FilterExec: t2_int@1 > 1
 ----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 ------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2073,11 +2077,11 @@ Right Join:  Filter: join_t1.t1_id < join_t2.t2_id
 physical_plan
 NestedLoopJoinExec: join_type=Right, filter=t1_id@0 < t2_id@1
 --CoalescePartitionsExec
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------FilterExec: t1_id@0 > 22
 --------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 ----------MemoryExec: partitions=1, partition_sizes=[1]
---CoalesceBatchesExec: target_batch_size=4096
+--CoalesceBatchesExec: target_batch_size=2
 ----FilterExec: t2_id@0 > 11
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 --------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2643,7 +2647,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 # explain sort_merge_join_on_date32 inner sort merge join on data type (Date32)
 query TT
@@ -2658,12 +2662,12 @@ Inner Join: t1.c1 = t2.c1
 physical_plan
 SortMergeJoin: join_type=Inner, on=[(c1@0, c1@0)]
 --SortExec: expr=[c1@0 ASC]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ----------MemoryExec: partitions=1, partition_sizes=[1]
 --SortExec: expr=[c1@0 ASC]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------RepartitionExec: partitioning=Hash([c1@0], 2), input_partitions=2
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ----------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2689,13 +2693,13 @@ physical_plan
 ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as c4, c1@5 as 
c1, c2@6 as c2, c3@7 as c3, c4@8 as c4]
 --SortMergeJoin: join_type=Right, on=[(CAST(t1.c3 AS Decimal128(10, 2))@4, 
c3@2)]
 ----SortExec: expr=[CAST(t1.c3 AS Decimal128(10, 2))@4 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([CAST(t1.c3 AS Decimal128(10, 
2))@4], 2), input_partitions=2
 ----------ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c4@3 as 
c4, CAST(c3@2 AS Decimal128(10, 2)) as CAST(t1.c3 AS Decimal128(10, 2))]
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
 ----SortExec: expr=[c3@2 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([c3@2], 2), input_partitions=2
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2722,7 +2726,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 
 
@@ -2743,7 +2747,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 query TT
 explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id 
IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2751,13 +2755,13 @@ explain SELECT t1_id, t1_name FROM 
left_semi_anti_join_table_t1 t1 WHERE t1_id I
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, 
t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2792,13 +2796,13 @@ explain SELECT t1_id, t1_name FROM 
left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, 
t2_id@0)]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2827,7 +2831,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 #Test the left_semi_join scenarios where the current repartition_joins 
parameter is set to false .
 ####
@@ -2846,7 +2850,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 query TT
 explain SELECT t1_id, t1_name FROM left_semi_anti_join_table_t1 t1 WHERE t1_id 
IN (SELECT t2_id FROM left_semi_anti_join_table_t2 t2) ORDER BY t1_id
@@ -2854,7 +2858,7 @@ explain SELECT t1_id, t1_name FROM 
left_semi_anti_join_table_t1 t1 WHERE t1_id I
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, 
t2_id@0)]
 --------MemoryExec: partitions=1, partition_sizes=[1]
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -2890,7 +2894,7 @@ explain SELECT t1_id, t1_name FROM 
left_semi_anti_join_table_t1 t1 LEFT SEMI JOI
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, 
t2_id@0)]
 --------MemoryExec: partitions=1, partition_sizes=[1]
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -2920,7 +2924,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 
 #Test the right_semi_join scenarios where the current repartition_joins 
parameter is set to true .
@@ -2940,7 +2944,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 query TT
 explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 
WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = 
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -2948,13 +2952,13 @@ explain SELECT t1_id, t1_name, t1_int FROM 
right_semi_anti_join_table_t1 t1 WHER
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@1 != t1_name@0
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -2970,13 +2974,13 @@ explain SELECT t1_id, t1_name, t1_int FROM 
right_semi_anti_join_table_t2 t2 RIGH
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@0 != t1_name@1
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------MemoryExec: partitions=1, partition_sizes=[1]
@@ -3002,7 +3006,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 
 #Test the right_semi_join scenarios where the current repartition_joins 
parameter is set to false .
@@ -3022,7 +3026,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 query TT
 explain SELECT t1_id, t1_name, t1_int FROM right_semi_anti_join_table_t1 t1 
WHERE EXISTS (SELECT * FROM right_semi_anti_join_table_t2 t2 where t2.t2_id = 
t1.t1_id and t2.t2_name <> t1.t1_name) ORDER BY t1_id
@@ -3030,7 +3034,7 @@ explain SELECT t1_id, t1_name, t1_int FROM 
right_semi_anti_join_table_t1 t1 WHER
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@1 != t1_name@0
 --------MemoryExec: partitions=1, partition_sizes=[1]
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -3047,7 +3051,7 @@ explain SELECT t1_id, t1_name, t1_int FROM 
right_semi_anti_join_table_t2 t2 RIGH
 physical_plan
 SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST]
 --SortExec: expr=[t1_id@0 ASC NULLS LAST]
-----CoalesceBatchesExec: target_batch_size=4096
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, 
t1_id@0)], filter=t2_name@0 != t1_name@1
 --------MemoryExec: partitions=1, partition_sizes=[1]
 --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
@@ -3074,7 +3078,7 @@ statement ok
 set datafusion.execution.target_partitions = 2;
 
 statement ok
-set datafusion.execution.batch_size = 4096;
+set datafusion.execution.batch_size = 2;
 
 
 ####
@@ -3126,14 +3130,14 @@ physical_plan
 SortPreservingMergeExec: [rn1@5 ASC NULLS LAST]
 --SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
 ----SortExec: expr=[rn1@5 ASC NULLS LAST]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, 
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
 --------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
 ----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
 ----SortExec: expr=[a@1 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
@@ -3162,12 +3166,12 @@ physical_plan
 SortPreservingMergeExec: [rn1@10 ASC NULLS LAST]
 --SortMergeJoin: join_type=Right, on=[(a@1, a@1)]
 ----SortExec: expr=[a@1 ASC]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
 ----SortExec: expr=[rn1@5 ASC NULLS LAST]
-------CoalesceBatchesExec: target_batch_size=4096
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
 ------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, 
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
@@ -3203,14 +3207,14 @@ SortPreservingMergeExec: [a@1 ASC,b@2 ASC NULLS 
LAST,c@3 ASC NULLS LAST,rn1@11 A
 --SortExec: expr=[a@1 ASC,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,rn1@11 ASC 
NULLS LAST]
 ----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
 ------SortExec: expr=[a@1 ASC]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, 
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
 ----------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame 
{ units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)) }], mode=[Sorted]
 ------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
 ------SortExec: expr=[a@1 ASC]
---------CoalesceBatchesExec: target_batch_size=4096
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
 ------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
 --------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, 
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
@@ -3245,7 +3249,7 @@ Sort: r_table.rn1 ASC NULLS LAST
 --------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
 ----------TableScan: annotated_data projection=[a0, a, b, c, d]
 physical_plan
-CoalesceBatchesExec: target_batch_size=4096
+CoalesceBatchesExec: target_batch_size=2
 --HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@1, a@1)]
 ----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
 ----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, 
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
@@ -3272,7 +3276,7 @@ Sort: r_table.rn1 ASC NULLS LAST
 --------WindowAggr: windowExpr=[[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING]]
 ----------TableScan: annotated_data projection=[a0, a, b, c, d]
 physical_plan
-CoalesceBatchesExec: target_batch_size=4096
+CoalesceBatchesExec: target_batch_size=2
 --HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(a@0, a@1)]
 ----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], 
output_ordering=[a@0 ASC], has_header=true
 ----ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, 
ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1]
diff --git a/datafusion/sqllogictest/test_files/options.slt 
b/datafusion/sqllogictest/test_files/options.slt
index 5fbb2102f4..83fe85745e 100644
--- a/datafusion/sqllogictest/test_files/options.slt
+++ b/datafusion/sqllogictest/test_files/options.slt
@@ -33,7 +33,7 @@ Filter: a.c0 < Int32(1)
 physical_plan
 CoalesceBatchesExec: target_batch_size=8192
 --FilterExec: c0@0 < 1
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----MemoryExec: partitions=1, partition_sizes=[1]
 
 ##
 # test_disable_coalesce
@@ -51,7 +51,7 @@ Filter: a.c0 < Int32(1)
 --TableScan: a projection=[c0]
 physical_plan
 FilterExec: c0@0 < 1
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
 
 statement ok
 set datafusion.execution.coalesce_batches = true
@@ -74,7 +74,7 @@ Filter: a.c0 < Int32(1)
 physical_plan
 CoalesceBatchesExec: target_batch_size=1234
 --FilterExec: c0@0 < 1
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----MemoryExec: partitions=1, partition_sizes=[1]
 
 
 statement ok
diff --git a/datafusion/sqllogictest/test_files/select.slt 
b/datafusion/sqllogictest/test_files/select.slt
index b099107358..1d42747976 100644
--- a/datafusion/sqllogictest/test_files/select.slt
+++ b/datafusion/sqllogictest/test_files/select.slt
@@ -485,8 +485,7 @@ Projection: select_between_data.c1 >= Int64(2) AND 
select_between_data.c1 <= Int
 --TableScan: select_between_data projection=[c1]
 physical_plan
 ProjectionExec: expr=[c1@0 >= 2 AND c1@0 <= 3 as select_between_data.c1 
BETWEEN Int64(2) AND Int64(3)]
---RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
-----MemoryExec: partitions=1, partition_sizes=[1]
+--MemoryExec: partitions=1, partition_sizes=[1]
 
 
 # TODO: query_get_indexed_field
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 2eccb60aad..7cbb848f33 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -15,6 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+# make sure to a batch size smaller than row number of the table.
+statement ok
+set datafusion.execution.batch_size = 2;
+
 #############
 ## Subquery Tests
 #############
@@ -178,15 +182,15 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS 
t2_sum
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
 --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), 
t2_id@1 as t2_id]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
 --------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
 ----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
-------------CoalesceBatchesExec: target_batch_size=8192
+------------CoalesceBatchesExec: target_batch_size=2
 --------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
 ----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
 ------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
 ------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
@@ -213,15 +217,15 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * 
Float64(1)) + Int64(1) AS t2
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + 
Int64(1)@1 as t2_sum]
 --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) + 
Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
 --------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as 
SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
 ----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int * Float64(1))]
-------------CoalesceBatchesExec: target_batch_size=8192
+------------CoalesceBatchesExec: target_batch_size=2
 --------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
 ----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int * Float64(1))]
 ------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
 ------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
@@ -247,16 +251,16 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS 
t2_sum
 ----------TableScan: t2 projection=[t2_id, t2_int]
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
---CoalesceBatchesExec: target_batch_size=8192
+--CoalesceBatchesExec: target_batch_size=2
 ----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
-------CoalesceBatchesExec: target_batch_size=8192
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
 ----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------CoalesceBatchesExec: target_batch_size=8192
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([t2_id@1], 4), input_partitions=4
 ----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
 ------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id, 
Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)]
---------------CoalesceBatchesExec: target_batch_size=8192
+--------------CoalesceBatchesExec: target_batch_size=2
 ----------------RepartitionExec: partitioning=Hash([t2_id@0, Utf8("a")@1], 4), 
input_partitions=4
 ------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as 
Utf8("a")], aggr=[SUM(t2.t2_int)]
 --------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
@@ -285,17 +289,17 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS 
t2_sum
 physical_plan
 ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
 --ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), 
t2_id@1 as t2_id]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
 --------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
-----------CoalesceBatchesExec: target_batch_size=8192
+----------CoalesceBatchesExec: target_batch_size=2
 ------------FilterExec: SUM(t2.t2_int)@1 < 3
 --------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
-----------------CoalesceBatchesExec: target_batch_size=8192
+----------------CoalesceBatchesExec: target_batch_size=2
 ------------------RepartitionExec: partitioning=Hash([t2_id@0], 4), 
input_partitions=4
 --------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], 
aggr=[SUM(t2.t2_int)]
 ----------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
 ------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
diff --git a/datafusion/sqllogictest/test_files/tpch/q15.slt.part 
b/datafusion/sqllogictest/test_files/tpch/q15.slt.part
index 4515b8ae1f..a872e96acf 100644
--- a/datafusion/sqllogictest/test_files/tpch/q15.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/q15.slt.part
@@ -95,20 +95,19 @@ SortPreservingMergeExec: [s_suppkey@0 ASC NULLS LAST]
 ----------------------------------FilterExec: l_shipdate@3 >= 9496 AND 
l_shipdate@3 < 9587
 ------------------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_discou [...]
 ----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: 
partitioning=Hash([MAX(revenue0.total_revenue)@0], 4), input_partitions=4
---------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-----------------AggregateExec: mode=Final, gby=[], 
aggr=[MAX(revenue0.total_revenue)]
-------------------CoalescePartitionsExec
---------------------AggregateExec: mode=Partial, gby=[], 
aggr=[MAX(revenue0.total_revenue)]
-----------------------ProjectionExec: expr=[SUM(lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount)@1 as total_revenue]
-------------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 
as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
---------------------------CoalesceBatchesExec: target_batch_size=8192
-----------------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 
4), input_partitions=4
-------------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as 
l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
---------------------------------ProjectionExec: expr=[l_suppkey@0 as 
l_suppkey, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]
-----------------------------------CoalesceBatchesExec: target_batch_size=8192
-------------------------------------FilterExec: l_shipdate@3 >= 9496 AND 
l_shipdate@3 < 9587
---------------------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_disc [...]
+------------RepartitionExec: 
partitioning=Hash([MAX(revenue0.total_revenue)@0], 4), input_partitions=1
+--------------AggregateExec: mode=Final, gby=[], 
aggr=[MAX(revenue0.total_revenue)]
+----------------CoalescePartitionsExec
+------------------AggregateExec: mode=Partial, gby=[], 
aggr=[MAX(revenue0.total_revenue)]
+--------------------ProjectionExec: expr=[SUM(lineitem.l_extendedprice * 
Int64(1) - lineitem.l_discount)@1 as total_revenue]
+----------------------AggregateExec: mode=FinalPartitioned, gby=[l_suppkey@0 
as l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
+------------------------CoalesceBatchesExec: target_batch_size=8192
+--------------------------RepartitionExec: partitioning=Hash([l_suppkey@0], 
4), input_partitions=4
+----------------------------AggregateExec: mode=Partial, gby=[l_suppkey@0 as 
l_suppkey], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - 
lineitem.l_discount)]
+------------------------------ProjectionExec: expr=[l_suppkey@0 as l_suppkey, 
l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount]
+--------------------------------CoalesceBatchesExec: target_batch_size=8192
+----------------------------------FilterExec: l_shipdate@3 >= 9496 AND 
l_shipdate@3 < 9587
+------------------------------------CsvExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
 projection=[l_suppkey, l_extendedprice, l_discou [...]
 
 query ITTTR
 with revenue0 (supplier_no, total_revenue) as (
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index cbb1896efb..688774c906 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -200,13 +200,14 @@ Aggregate: groupBy=[[t1.name]], aggr=[[]]
 physical_plan
 AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
 --CoalesceBatchesExec: target_batch_size=8192
-----RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=12
-------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------UnionExec
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-----------ProjectionExec: expr=[name@0 || _new as name]
-------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
+--------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
+----------UnionExec
+------------MemoryExec: partitions=1, partition_sizes=[1]
+------------MemoryExec: partitions=1, partition_sizes=[1]
+------------ProjectionExec: expr=[name@0 || _new as name]
+--------------MemoryExec: partitions=1, partition_sizes=[1]
 
 # nested_union_all
 query T rowsort
@@ -234,11 +235,14 @@ Union
 ----TableScan: t2 projection=[name]
 physical_plan
 UnionExec
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--MemoryExec: partitions=1, partition_sizes=[1]
+--MemoryExec: partitions=1, partition_sizes=[1]
 --ProjectionExec: expr=[name@0 || _new as name]
-----MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----MemoryExec: partitions=1, partition_sizes=[1]
 
+# Make sure to choose a small batch size to introduce parallelism to the plan.
+statement ok
+set datafusion.execution.batch_size = 2;
 
 # union_with_type_coercion
 query TT
@@ -269,32 +273,36 @@ Union
 physical_plan
 UnionExec
 --ProjectionExec: expr=[id@0 as id, name@1 as name]
-----CoalesceBatchesExec: target_batch_size=8192
+----CoalesceBatchesExec: target_batch_size=2
 ------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(id@0, 
CAST(t2.id AS Int32)@2), (name@1, name@1)]
 --------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 as 
name], aggr=[]
-----------CoalesceBatchesExec: target_batch_size=8192
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
 --------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as name], 
aggr=[]
-----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---------CoalesceBatchesExec: target_batch_size=8192
+----------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, name@1], 
4), input_partitions=4
 ------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS 
Int32) as CAST(t2.id AS Int32)]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 --ProjectionExec: expr=[CAST(id@0 AS Int32) as id, name@1 as name]
 ----ProjectionExec: expr=[id@0 as id, name@1 as name]
-------CoalesceBatchesExec: target_batch_size=8192
+------CoalesceBatchesExec: target_batch_size=2
 --------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(CAST(t2.id AS 
Int32)@2, id@0), (name@1, name@1)]
-----------CoalesceBatchesExec: target_batch_size=8192
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([CAST(t2.id AS Int32)@2, 
name@1], 4), input_partitions=4
 --------------ProjectionExec: expr=[id@0 as id, name@1 as name, CAST(id@0 AS 
Int32) as CAST(t2.id AS Int32)]
 ----------------AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, name@1 
as name], aggr=[]
-------------------CoalesceBatchesExec: target_batch_size=8192
+------------------CoalesceBatchesExec: target_batch_size=2
 --------------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
 ----------------------AggregateExec: mode=Partial, gby=[id@0 as id, name@1 as 
name], aggr=[]
-------------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-----------CoalesceBatchesExec: target_batch_size=8192
+------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+--------------------------MemoryExec: partitions=1, partition_sizes=[1]
+----------CoalesceBatchesExec: target_batch_size=2
 ------------RepartitionExec: partitioning=Hash([id@0, name@1], 4), 
input_partitions=4
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 query IT rowsort
 (
@@ -339,26 +347,30 @@ Union
 ----TableScan: t1 projection=[name]
 physical_plan
 InterleaveExec
---CoalesceBatchesExec: target_batch_size=8192
+--CoalesceBatchesExec: target_batch_size=2
 ----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)]
 ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
---CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
+--CoalesceBatchesExec: target_batch_size=2
 ----HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(name@0, name@0)]
 ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
-------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
+------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
-----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+------------MemoryExec: partitions=1, partition_sizes=[1]
 
 # union_upcast_types
 query TT
@@ -416,15 +428,17 @@ ProjectionExec: expr=[COUNT(*)@1 as COUNT(*)]
 --AggregateExec: mode=SinglePartitioned, gby=[name@0 as name], aggr=[COUNT(*)]
 ----InterleaveExec
 ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 ------AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], aggr=[]
---------CoalesceBatchesExec: target_batch_size=8192
+--------CoalesceBatchesExec: target_batch_size=2
 ----------RepartitionExec: partitioning=Hash([name@0], 4), input_partitions=4
 ------------AggregateExec: mode=Partial, gby=[name@0 as name], aggr=[]
---------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
+--------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------MemoryExec: partitions=1, partition_sizes=[1]
 
 
 ########
@@ -530,11 +544,10 @@ physical_plan
 UnionExec
 --ProjectionExec: expr=[Int64(1)@0 as a]
 ----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=4
+------CoalesceBatchesExec: target_batch_size=2
+--------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1
 ----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
-------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
---------------EmptyExec: produce_one_row=true
+------------EmptyExec: produce_one_row=true
 --ProjectionExec: expr=[2 as a]
 ----EmptyExec: produce_one_row=true
 --ProjectionExec: expr=[3 as a]
@@ -562,16 +575,12 @@ physical_plan
 UnionExec
 --ProjectionExec: expr=[COUNT(*)@1 as count, n@0 as n]
 ----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)]
-------CoalesceBatchesExec: target_batch_size=8192
---------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=4
+------CoalesceBatchesExec: target_batch_size=2
+--------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1
 ----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
-------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
---------------ProjectionExec: expr=[5 as n]
-----------------EmptyExec: produce_one_row=true
+------------ProjectionExec: expr=[5 as n]
+--------------EmptyExec: produce_one_row=true
 --ProjectionExec: expr=[x@0 as count, y@1 as n]
 ----ProjectionExec: expr=[1 as x, MAX(Int64(10))@0 as y]
-------AggregateExec: mode=Final, gby=[], aggr=[MAX(Int64(10))]
---------CoalescePartitionsExec
-----------AggregateExec: mode=Partial, gby=[], aggr=[MAX(Int64(10))]
-------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
---------------EmptyExec: produce_one_row=true
+------AggregateExec: mode=Single, gby=[], aggr=[MAX(Int64(10))]
+--------EmptyExec: produce_one_row=true

Reply via email to