This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 504f247674 Make memory exec partition 1 (#7148)
504f247674 is described below
commit 504f24767486b8bf9cb08dd54b829b1654f1054f
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Jul 31 23:39:38 2023 +0300
Make memory exec partition 1 (#7148)
---
.../src/physical_optimizer/sort_enforcement.rs | 72 +++++++++++-----------
.../core/src/physical_optimizer/test_utils.rs | 2 +-
2 files changed, 37 insertions(+), 37 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index d8ee638e3b..5d8b44f50d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -1445,11 +1445,11 @@ mod tests {
let expected_input = vec![
"SortExec: expr=[nullable_col@0 ASC]",
" SortExec: expr=[non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1503,7 +1503,7 @@ mod tests {
" BoundedWindowAggExec: wdw=[count: Ok(Field { name:
\"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Range, start_bound:
Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
@@ -1512,7 +1512,7 @@ mod tests {
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }], mode=[Sorted]",
" CoalesceBatchesExec: target_batch_size=128",
" SortExec: expr=[non_nullable_col@1 DESC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1529,11 +1529,11 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1555,11 +1555,11 @@ mod tests {
" SortExec: expr=[nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1592,13 +1592,13 @@ mod tests {
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1
ASC]",
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
" SortExec: expr=[non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=10",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1635,14 +1635,14 @@ mod tests {
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
" SortExec: expr=[non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"AggregateExec: mode=Final, gby=[], aggr=[]",
" CoalescePartitionsExec",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1685,10 +1685,10 @@ mod tests {
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
" SortExec: expr=[non_nullable_col@1 ASC]",
" UnionExec",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
@@ -1696,10 +1696,10 @@ mod tests {
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" FilterExec: NOT non_nullable_col@1",
" UnionExec",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1723,13 +1723,13 @@ mod tests {
let expected_input = vec![
"SortExec: expr=[a@2 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0,
c@2)]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC]",
];
let expected_optimized = vec![
"HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0,
c@2)]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
" ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
@@ -1755,11 +1755,11 @@ mod tests {
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
" SortPreservingMergeExec: [non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1824,11 +1824,11 @@ mod tests {
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1
ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -1851,11 +1851,11 @@ mod tests {
"SortPreservingMergeExec: [non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0
ASC,non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortExec: expr=[non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -2632,7 +2632,7 @@ mod tests {
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
@@ -2640,7 +2640,7 @@ mod tests {
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }], mode=[Sorted]",
" BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL),
end_bound: CurrentRow }], mode=[Sorted]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
@@ -2754,14 +2754,14 @@ mod tests {
" SortExec: expr=[nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
" CoalescePartitionsExec",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: expr=[nullable_col@0 ASC]",
- " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=0",
- " MemoryExec: partitions=0, partition_sizes=[]",
+ " RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
+ " MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
Ok(())
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 0386bcd3d0..5c8d8b1e64 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -194,7 +194,7 @@ pub fn coalesce_partitions_exec(input: Arc<dyn
ExecutionPlan>) -> Arc<dyn Execut
}
pub(crate) fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
- Arc::new(MemoryExec::try_new(&[], schema.clone(), None).unwrap())
+ Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap())
}
pub fn hash_join_exec(