This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 504f247674 Make memory exec partition 1 (#7148)
504f247674 is described below

commit 504f24767486b8bf9cb08dd54b829b1654f1054f
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jul 31 23:39:38 2023 +0300

    Make memory exec partition 1 (#7148)
---
 .../src/physical_optimizer/sort_enforcement.rs     | 72 +++++++++++-----------
 .../core/src/physical_optimizer/test_utils.rs      |  2 +-
 2 files changed, 37 insertions(+), 37 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index d8ee638e3b..5d8b44f50d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1445,11 +1445,11 @@ mod tests {
         let expected_input = vec![
             "SortExec: expr=[nullable_col@0 ASC]",
             "  SortExec: expr=[non_nullable_col@1 ASC]",
-            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "    MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortExec: expr=[nullable_col@0 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1503,7 +1503,7 @@ mod tests {
             "      BoundedWindowAggExec: wdw=[count: Ok(Field { name: 
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: 
Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
             "        CoalesceBatchesExec: target_batch_size=128",
             "          SortExec: expr=[non_nullable_col@1 DESC]",
-            "            MemoryExec: partitions=0, partition_sizes=[]",
+            "            MemoryExec: partitions=1, partition_sizes=[0]",
         ];
 
         let expected_optimized = vec![
@@ -1512,7 +1512,7 @@ mod tests {
             "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
             "      CoalesceBatchesExec: target_batch_size=128",
             "        SortExec: expr=[non_nullable_col@1 DESC]",
-            "          MemoryExec: partitions=0, partition_sizes=[]",
+            "          MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1529,11 +1529,11 @@ mod tests {
 
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortExec: expr=[nullable_col@0 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1555,11 +1555,11 @@ mod tests {
             "  SortExec: expr=[nullable_col@0 ASC]",
             "    SortPreservingMergeExec: [nullable_col@0 ASC]",
             "      SortExec: expr=[nullable_col@0 ASC]",
-            "        MemoryExec: partitions=0, partition_sizes=[]",
+            "        MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortExec: expr=[nullable_col@0 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1592,13 +1592,13 @@ mod tests {
             "        SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "          SortPreservingMergeExec: [non_nullable_col@1 ASC]",
             "            SortExec: expr=[non_nullable_col@1 ASC]",
-            "              MemoryExec: partitions=0, partition_sizes=[]",
+            "              MemoryExec: partitions=1, partition_sizes=[0]",
         ];
 
         let expected_optimized = vec![
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=10",
-            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "  RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "    MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1635,14 +1635,14 @@ mod tests {
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        SortPreservingMergeExec: [non_nullable_col@1 ASC]",
             "          SortExec: expr=[non_nullable_col@1 ASC]",
-            "            MemoryExec: partitions=0, partition_sizes=[]",
+            "            MemoryExec: partitions=1, partition_sizes=[0]",
         ];
 
         let expected_optimized = vec![
             "AggregateExec: mode=Final, gby=[], aggr=[]",
             "  CoalescePartitionsExec",
-            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "      MemoryExec: partitions=0, partition_sizes=[]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "      MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1685,10 +1685,10 @@ mod tests {
             "    SortPreservingMergeExec: [non_nullable_col@1 ASC]",
             "      SortExec: expr=[non_nullable_col@1 ASC]",
             "        UnionExec",
-            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "            MemoryExec: partitions=0, partition_sizes=[]",
-            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "            MemoryExec: partitions=0, partition_sizes=[]",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "            MemoryExec: partitions=1, partition_sizes=[0]",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "            MemoryExec: partitions=1, partition_sizes=[0]",
         ];
 
         let expected_optimized = vec![
@@ -1696,10 +1696,10 @@ mod tests {
             "  SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
             "    FilterExec: NOT non_nullable_col@1",
             "      UnionExec",
-            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "          MemoryExec: partitions=0, partition_sizes=[]",
-            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "          MemoryExec: partitions=0, partition_sizes=[]",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          MemoryExec: partitions=1, partition_sizes=[0]",
+            "        RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "          MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1723,13 +1723,13 @@ mod tests {
         let expected_input = vec![
             "SortExec: expr=[a@2 ASC]",
             "  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0, 
c@2)]",
-            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "    MemoryExec: partitions=1, partition_sizes=[0]",
             "    ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, 
c, d, e], output_ordering=[a@0 ASC]",
         ];
 
         let expected_optimized = vec![
             "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0, 
c@2)]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
             "  ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[a@0 ASC]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
@@ -1755,11 +1755,11 @@ mod tests {
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  SortPreservingMergeExec: [non_nullable_col@1 ASC]",
             "    SortPreservingMergeExec: [non_nullable_col@1 ASC]",
-            "      MemoryExec: partitions=0, partition_sizes=[]",
+            "      MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortExec: expr=[nullable_col@0 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1824,11 +1824,11 @@ mod tests {
         let expected_input = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 
ASC]",
             "  SortExec: expr=[nullable_col@0 ASC]",
-            "    MemoryExec: partitions=0, partition_sizes=[]",
+            "    MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -1851,11 +1851,11 @@ mod tests {
             "SortPreservingMergeExec: [non_nullable_col@1 ASC]",
             "  SortExec: expr=[nullable_col@0 ASC]",
             "    SortPreservingMergeExec: [nullable_col@0 
ASC,non_nullable_col@1 ASC]",
-            "      MemoryExec: partitions=0, partition_sizes=[]",
+            "      MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortExec: expr=[non_nullable_col@1 ASC]",
-            "  MemoryExec: partitions=0, partition_sizes=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -2632,7 +2632,7 @@ mod tests {
             "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
             "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
             "      SortExec: expr=[nullable_col@0 ASC]",
-            "        MemoryExec: partitions=0, partition_sizes=[]",
+            "        MemoryExec: partitions=1, partition_sizes=[0]",
         ];
 
         let expected_optimized = vec![
@@ -2640,7 +2640,7 @@ mod tests {
             "  BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
             "    BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), 
end_bound: CurrentRow }], mode=[Sorted]",
             "      SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
-            "        MemoryExec: partitions=0, partition_sizes=[]",
+            "        MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
@@ -2754,14 +2754,14 @@ mod tests {
             "    SortExec: expr=[nullable_col@0 ASC]",
             "      RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "        CoalescePartitionsExec",
-            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "            MemoryExec: partitions=0, partition_sizes=[]",
+            "          RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "            MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         let expected_optimized = vec![
             "SortPreservingMergeExec: [nullable_col@0 ASC]",
             "  SortExec: expr=[nullable_col@0 ASC]",
-            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=0",
-            "      MemoryExec: partitions=0, partition_sizes=[]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
+            "      MemoryExec: partitions=1, partition_sizes=[0]",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
         Ok(())
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 0386bcd3d0..5c8d8b1e64 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -194,7 +194,7 @@ pub fn coalesce_partitions_exec(input: Arc<dyn 
ExecutionPlan>) -> Arc<dyn Execut
 }
 
 pub(crate) fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
-    Arc::new(MemoryExec::try_new(&[], schema.clone(), None).unwrap())
+    Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap())
 }
 
 pub fn hash_join_exec(

Reply via email to