This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4c6f5c5310 Fix column indices in the planning tests (#8191)
4c6f5c5310 is described below
commit 4c6f5c5310f82e7aed3c5634b4d6c58d8780d9e5
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Nov 16 10:01:20 2023 +0300
Fix column indices in the planning tests (#8191)
---
.../src/physical_optimizer/enforce_distribution.rs | 10 +-
.../replace_with_order_preserving_variants.rs | 185 +++++++++++----------
2 files changed, 108 insertions(+), 87 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 12f27ab18f..4aedc3b0d1 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -3794,7 +3794,11 @@ pub(crate) mod tests {
sort_key,
projection_exec_with_alias(
filter_exec(parquet_exec()),
- vec![("a".to_string(), "a".to_string())],
+ vec![
+ ("a".to_string(), "a".to_string()),
+ ("b".to_string(), "b".to_string()),
+ ("c".to_string(), "c".to_string()),
+ ],
),
false,
);
@@ -3803,7 +3807,7 @@ pub(crate) mod tests {
"SortPreservingMergeExec: [c@2 ASC]",
// Expect repartition on the input to the sort (as it can benefit
from additional parallelism)
"SortExec: expr=[c@2 ASC]",
- "ProjectionExec: expr=[a@0 as a]",
+ "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"FilterExec: c@2 = 0",
// repartition is lowest down
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
@@ -3815,7 +3819,7 @@ pub(crate) mod tests {
let expected_first_sort_enforcement = &[
"SortExec: expr=[c@2 ASC]",
"CoalescePartitionsExec",
- "ProjectionExec: expr=[a@0 as a]",
+ "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e]",
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 58806be6d4..7f8c9b852c 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -359,11 +359,11 @@ mod tests {
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -379,38 +379,41 @@ mod tests {
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let sort = sort_exec(
- vec![sort_expr_default("a", &schema)],
+ vec![sort_expr_default("a", &coalesce_partitions.schema())],
coalesce_partitions,
false,
);
let repartition_rr2 = repartition_exec_round_robin(sort);
let repartition_hash2 = repartition_exec_hash(repartition_rr2);
- let filter = filter_exec(repartition_hash2, &schema);
- let sort2 = sort_exec(vec![sort_expr_default("a", &schema)], filter,
true);
+ let filter = filter_exec(repartition_hash2);
+ let sort2 =
+ sort_exec(vec![sort_expr_default("a", &filter.schema())], filter,
true);
- let physical_plan =
- sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)],
sort2);
+ let physical_plan = sort_preserving_merge_exec(
+ vec![sort_expr_default("a", &sort2.schema())],
+ sort2,
+ );
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC],
has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
- " FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC",
+ " FilterExec: c@1 > 3",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" SortPreservingMergeExec: [a@0 ASC]",
- " SortPreservingRepartitionExec:
partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1],
8), input_partitions=8, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC],
has_header=true",
];
@@ -424,7 +427,7 @@ mod tests {
let sort_exprs = vec![sort_expr("a", &schema)];
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
- let filter = filter_exec(repartition_rr, &schema);
+ let filter = filter_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(filter);
let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash,
true);
@@ -433,14 +436,14 @@ mod tests {
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
- " FilterExec: c@2 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
- " FilterExec: c@2 > 3",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " FilterExec: c@1 > 3",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
@@ -456,7 +459,7 @@ mod tests {
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
- let filter = filter_exec(repartition_hash, &schema);
+ let filter = filter_exec(repartition_hash);
let coalesce_batches_exec: Arc<dyn ExecutionPlan> =
coalesce_batches_exec(filter);
let sort = sort_exec(vec![sort_expr("a", &schema)],
coalesce_batches_exec, true);
@@ -466,14 +469,14 @@ mod tests {
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " FilterExec: c@1 > 3",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -488,7 +491,7 @@ mod tests {
let repartition_rr = repartition_exec_round_robin(source);
let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
- let filter = filter_exec(repartition_hash, &schema);
+ let filter = filter_exec(repartition_hash);
let coalesce_batches_exec_2 = coalesce_batches_exec(filter);
let sort =
sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec_2,
true);
@@ -499,15 +502,15 @@ mod tests {
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " FilterExec: c@1 > 3",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
@@ -522,7 +525,7 @@ mod tests {
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
- let filter = filter_exec(repartition_hash, &schema);
+ let filter = filter_exec(repartition_hash);
let coalesce_batches_exec: Arc<dyn ExecutionPlan> =
coalesce_batches_exec(filter);
let physical_plan: Arc<dyn ExecutionPlan> =
@@ -530,14 +533,14 @@ mod tests {
let expected_input = ["CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -551,7 +554,7 @@ mod tests {
let source = csv_exec_sorted(&schema, sort_exprs, true);
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
- let filter = filter_exec(repartition_hash, &schema);
+ let filter = filter_exec(repartition_hash);
let coalesce_batches = coalesce_batches_exec(filter);
let repartition_hash_2 = repartition_exec_hash(coalesce_batches);
let sort = sort_exec(vec![sort_expr("a", &schema)],
repartition_hash_2, true);
@@ -562,19 +565,19 @@ mod tests {
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " FilterExec: c@1 > 3",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1],
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
];
@@ -590,22 +593,24 @@ mod tests {
let repartition_rr = repartition_exec_round_robin(source);
let repartition_hash = repartition_exec_hash(repartition_rr);
let sort = sort_exec(
- vec![sort_expr_default("c", &schema)],
+ vec![sort_expr_default("c", &repartition_hash.schema())],
repartition_hash,
true,
);
- let physical_plan =
- sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)],
sort);
+ let physical_plan = sort_preserving_merge_exec(
+ vec![sort_expr_default("c", &sort.schema())],
+ sort,
+ );
- let expected_input = ["SortPreservingMergeExec: [c@2 ASC]",
- " SortExec: expr=[c@2 ASC]",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ let expected_input = ["SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
- let expected_optimized = ["SortPreservingMergeExec: [c@2 ASC]",
- " SortExec: expr=[c@2 ASC]",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ let expected_optimized = ["SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -625,11 +630,11 @@ mod tests {
let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalescePartitionsExec",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -645,39 +650,42 @@ mod tests {
let repartition_hash = repartition_exec_hash(repartition_rr);
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let sort = sort_exec(
- vec![sort_expr_default("c", &schema)],
+ vec![sort_expr_default("c", &coalesce_partitions.schema())],
coalesce_partitions,
false,
);
let repartition_rr2 = repartition_exec_round_robin(sort);
let repartition_hash2 = repartition_exec_hash(repartition_rr2);
- let filter = filter_exec(repartition_hash2, &schema);
- let sort2 = sort_exec(vec![sort_expr_default("c", &schema)], filter,
true);
+ let filter = filter_exec(repartition_hash2);
+ let sort2 =
+ sort_exec(vec![sort_expr_default("c", &filter.schema())], filter,
true);
- let physical_plan =
- sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)],
sort2);
+ let physical_plan = sort_preserving_merge_exec(
+ vec![sort_expr_default("c", &sort2.schema())],
+ sort2,
+ );
let expected_input = [
- "SortPreservingMergeExec: [c@2 ASC]",
- " SortExec: expr=[c@2 ASC]",
- " FilterExec: c@2 > 3",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " SortExec: expr=[c@1 ASC]",
+ " FilterExec: c@1 > 3",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " SortExec: expr=[c@2 ASC]",
+ " SortExec: expr=[c@1 ASC]",
" CoalescePartitionsExec",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
];
let expected_optimized = [
- "SortPreservingMergeExec: [c@2 ASC]",
- " FilterExec: c@2 > 3",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=c@2 ASC",
+ "SortPreservingMergeExec: [c@1 ASC]",
+ " FilterExec: c@1 > 3",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=c@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
- " SortExec: expr=[c@2 ASC]",
+ " SortExec: expr=[c@1 ASC]",
" CoalescePartitionsExec",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
];
@@ -705,21 +713,27 @@ mod tests {
let hash_join_exec =
hash_join_exec(left_coalesce_partitions,
right_coalesce_partitions);
- let sort = sort_exec(vec![sort_expr_default("a", &schema)],
hash_join_exec, true);
+ let sort = sort_exec(
+ vec![sort_expr_default("a", &hash_join_exec.schema())],
+ hash_join_exec,
+ true,
+ );
- let physical_plan =
- sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)],
sort);
+ let physical_plan = sort_preserving_merge_exec(
+ vec![sort_expr_default("a", &sort.schema())],
+ sort,
+ );
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortExec: expr=[a@0 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
];
@@ -729,11 +743,11 @@ mod tests {
" SortExec: expr=[a@0 ASC]",
" HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
c@1)]",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
" CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS
LAST], has_header=true",
];
@@ -754,11 +768,11 @@ mod tests {
let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
- " RepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8",
+ " RepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS
LAST]",
- " SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+ " SortPreservingRepartitionExec: partitioning=Hash([c@1], 8),
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]},
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
assert_optimized!(expected_input, expected_optimized, physical_plan,
true);
@@ -821,24 +835,23 @@ mod tests {
}
fn repartition_exec_hash(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
+ let input_schema = input.schema();
Arc::new(
RepartitionExec::try_new(
input,
- Partitioning::Hash(vec![Arc::new(Column::new("c1", 0))], 8),
+ Partitioning::Hash(vec![col("c", &input_schema).unwrap()], 8),
)
.unwrap(),
)
}
- fn filter_exec(
- input: Arc<dyn ExecutionPlan>,
- schema: &SchemaRef,
- ) -> Arc<dyn ExecutionPlan> {
+ fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+ let input_schema = input.schema();
let predicate = expressions::binary(
- col("c", schema).unwrap(),
+ col("c", &input_schema).unwrap(),
Operator::Gt,
expressions::lit(3i32),
- schema,
+ &input_schema,
)
.unwrap();
Arc::new(FilterExec::try_new(predicate, input).unwrap())
@@ -856,11 +869,15 @@ mod tests {
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
+ let left_on = col("c", &left.schema()).unwrap();
+ let right_on = col("c", &right.schema()).unwrap();
+ let left_col = left_on.as_any().downcast_ref::<Column>().unwrap();
+ let right_col = right_on.as_any().downcast_ref::<Column>().unwrap();
Arc::new(
HashJoinExec::try_new(
left,
right,
- vec![(Column::new("c", 1), Column::new("c", 1))],
+ vec![(left_col.clone(), right_col.clone())],
None,
&JoinType::Inner,
PartitionMode::Partitioned,