This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c6f5c5310 Fix column indices in the planning tests (#8191)
4c6f5c5310 is described below

commit 4c6f5c5310f82e7aed3c5634b4d6c58d8780d9e5
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Nov 16 10:01:20 2023 +0300

    Fix column indices in the planning tests (#8191)
---
 .../src/physical_optimizer/enforce_distribution.rs |  10 +-
 .../replace_with_order_preserving_variants.rs      | 185 +++++++++++----------
 2 files changed, 108 insertions(+), 87 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 12f27ab18f..4aedc3b0d1 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -3794,7 +3794,11 @@ pub(crate) mod tests {
             sort_key,
             projection_exec_with_alias(
                 filter_exec(parquet_exec()),
-                vec![("a".to_string(), "a".to_string())],
+                vec![
+                    ("a".to_string(), "a".to_string()),
+                    ("b".to_string(), "b".to_string()),
+                    ("c".to_string(), "c".to_string()),
+                ],
             ),
             false,
         );
@@ -3803,7 +3807,7 @@ pub(crate) mod tests {
             "SortPreservingMergeExec: [c@2 ASC]",
             // Expect repartition on the input to the sort (as it can benefit 
from additional parallelism)
             "SortExec: expr=[c@2 ASC]",
-            "ProjectionExec: expr=[a@0 as a]",
+            "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
             "FilterExec: c@2 = 0",
             // repartition is lowest down
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
@@ -3815,7 +3819,7 @@ pub(crate) mod tests {
         let expected_first_sort_enforcement = &[
             "SortExec: expr=[c@2 ASC]",
             "CoalescePartitionsExec",
-            "ProjectionExec: expr=[a@0 as a]",
+            "ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]",
             "FilterExec: c@2 = 0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
             "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e]",
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 58806be6d4..7f8c9b852c 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -359,11 +359,11 @@ mod tests {
 
         let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -379,38 +379,41 @@ mod tests {
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
         let sort = sort_exec(
-            vec![sort_expr_default("a", &schema)],
+            vec![sort_expr_default("a", &coalesce_partitions.schema())],
             coalesce_partitions,
             false,
         );
         let repartition_rr2 = repartition_exec_round_robin(sort);
         let repartition_hash2 = repartition_exec_hash(repartition_rr2);
-        let filter = filter_exec(repartition_hash2, &schema);
-        let sort2 = sort_exec(vec![sort_expr_default("a", &schema)], filter, 
true);
+        let filter = filter_exec(repartition_hash2);
+        let sort2 =
+            sort_exec(vec![sort_expr_default("a", &filter.schema())], filter, 
true);
 
-        let physical_plan =
-            sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], 
sort2);
+        let physical_plan = sort_preserving_merge_exec(
+            vec![sort_expr_default("a", &sort2.schema())],
+            sort2,
+        );
 
         let expected_input = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  SortExec: expr=[a@0 ASC]",
-            "    FilterExec: c@2 > 3",
-            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          SortExec: expr=[a@0 ASC]",
             "            CoalescePartitionsExec",
-            "              RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], 
has_header=true",
         ];
 
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC]",
-            "  FilterExec: c@2 > 3",
-            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC",
+            "  FilterExec: c@1 > 3",
+            "    SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        SortPreservingMergeExec: [a@0 ASC]",
-            "          SortPreservingRepartitionExec: 
partitioning=Hash([c1@0], 8), input_partitions=8, sort_exprs=a@0 ASC",
+            "          SortPreservingRepartitionExec: partitioning=Hash([c@1], 
8), input_partitions=8, sort_exprs=a@0 ASC",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], 
has_header=true",
         ];
@@ -424,7 +427,7 @@ mod tests {
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = csv_exec_sorted(&schema, sort_exprs, true);
         let repartition_rr = repartition_exec_round_robin(source);
-        let filter = filter_exec(repartition_rr, &schema);
+        let filter = filter_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(filter);
         let sort = sort_exec(vec![sort_expr("a", &schema)], repartition_hash, 
true);
 
@@ -433,14 +436,14 @@ mod tests {
 
         let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
-            "      FilterExec: c@2 > 3",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
+            "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
-            "    FilterExec: c@2 > 3",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
 
@@ -456,7 +459,7 @@ mod tests {
         let source = csv_exec_sorted(&schema, sort_exprs, true);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
-        let filter = filter_exec(repartition_hash, &schema);
+        let filter = filter_exec(repartition_hash);
         let coalesce_batches_exec: Arc<dyn ExecutionPlan> = 
coalesce_batches_exec(filter);
         let sort = sort_exec(vec![sort_expr("a", &schema)], 
coalesce_batches_exec, true);
 
@@ -466,14 +469,14 @@ mod tests {
         let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    CoalesceBatchesExec: target_batch_size=8192",
-            "      FilterExec: c@2 > 3",
-            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
             "  CoalesceBatchesExec: target_batch_size=8192",
-            "    FilterExec: c@2 > 3",
-            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "    FilterExec: c@1 > 3",
+            "      SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -488,7 +491,7 @@ mod tests {
         let repartition_rr = repartition_exec_round_robin(source);
         let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
-        let filter = filter_exec(repartition_hash, &schema);
+        let filter = filter_exec(repartition_hash);
         let coalesce_batches_exec_2 = coalesce_batches_exec(filter);
         let sort =
             sort_exec(vec![sort_expr("a", &schema)], coalesce_batches_exec_2, 
true);
@@ -499,15 +502,15 @@ mod tests {
         let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    CoalesceBatchesExec: target_batch_size=8192",
-            "      FilterExec: c@2 > 3",
-            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "      FilterExec: c@1 > 3",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          CoalesceBatchesExec: target_batch_size=8192",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
             "  CoalesceBatchesExec: target_batch_size=8192",
-            "    FilterExec: c@2 > 3",
-            "      SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "    FilterExec: c@1 > 3",
+            "      SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "        CoalesceBatchesExec: target_batch_size=8192",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
@@ -522,7 +525,7 @@ mod tests {
         let source = csv_exec_sorted(&schema, sort_exprs, true);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
-        let filter = filter_exec(repartition_hash, &schema);
+        let filter = filter_exec(repartition_hash);
         let coalesce_batches_exec: Arc<dyn ExecutionPlan> = 
coalesce_batches_exec(filter);
 
         let physical_plan: Arc<dyn ExecutionPlan> =
@@ -530,14 +533,14 @@ mod tests {
 
         let expected_input = ["CoalescePartitionsExec",
             "  CoalesceBatchesExec: target_batch_size=8192",
-            "    FilterExec: c@2 > 3",
-            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["CoalescePartitionsExec",
             "  CoalesceBatchesExec: target_batch_size=8192",
-            "    FilterExec: c@2 > 3",
-            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -551,7 +554,7 @@ mod tests {
         let source = csv_exec_sorted(&schema, sort_exprs, true);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
-        let filter = filter_exec(repartition_hash, &schema);
+        let filter = filter_exec(repartition_hash);
         let coalesce_batches = coalesce_batches_exec(filter);
         let repartition_hash_2 = repartition_exec_hash(coalesce_batches);
         let sort = sort_exec(vec![sort_expr("a", &schema)], 
repartition_hash_2, true);
@@ -562,19 +565,19 @@ mod tests {
         let expected_input = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      CoalesceBatchesExec: target_batch_size=8192",
-            "        FilterExec: c@2 > 3",
-            "          RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        FilterExec: c@1 > 3",
+            "          RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    CoalesceBatchesExec: target_batch_size=8192",
-            "      FilterExec: c@2 > 3",
-            "        SortPreservingRepartitionExec: partitioning=Hash([c1@0], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "      FilterExec: c@1 > 3",
+            "        SortPreservingRepartitionExec: partitioning=Hash([c@1], 
8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
         ];
@@ -590,22 +593,24 @@ mod tests {
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let sort = sort_exec(
-            vec![sort_expr_default("c", &schema)],
+            vec![sort_expr_default("c", &repartition_hash.schema())],
             repartition_hash,
             true,
         );
 
-        let physical_plan =
-            sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], 
sort);
+        let physical_plan = sort_preserving_merge_exec(
+            vec![sort_expr_default("c", &sort.schema())],
+            sort,
+        );
 
-        let expected_input = ["SortPreservingMergeExec: [c@2 ASC]",
-            "  SortExec: expr=[c@2 ASC]",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+        let expected_input = ["SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
-        let expected_optimized = ["SortPreservingMergeExec: [c@2 ASC]",
-            "  SortExec: expr=[c@2 ASC]",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+        let expected_optimized = ["SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -625,11 +630,11 @@ mod tests {
 
         let expected_input = ["SortExec: expr=[a@0 ASC NULLS LAST]",
             "  CoalescePartitionsExec",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
@@ -645,39 +650,42 @@ mod tests {
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
         let sort = sort_exec(
-            vec![sort_expr_default("c", &schema)],
+            vec![sort_expr_default("c", &coalesce_partitions.schema())],
             coalesce_partitions,
             false,
         );
         let repartition_rr2 = repartition_exec_round_robin(sort);
         let repartition_hash2 = repartition_exec_hash(repartition_rr2);
-        let filter = filter_exec(repartition_hash2, &schema);
-        let sort2 = sort_exec(vec![sort_expr_default("c", &schema)], filter, 
true);
+        let filter = filter_exec(repartition_hash2);
+        let sort2 =
+            sort_exec(vec![sort_expr_default("c", &filter.schema())], filter, 
true);
 
-        let physical_plan =
-            sort_preserving_merge_exec(vec![sort_expr_default("c", &schema)], 
sort2);
+        let physical_plan = sort_preserving_merge_exec(
+            vec![sort_expr_default("c", &sort2.schema())],
+            sort2,
+        );
 
         let expected_input = [
-            "SortPreservingMergeExec: [c@2 ASC]",
-            "  SortExec: expr=[c@2 ASC]",
-            "    FilterExec: c@2 > 3",
-            "      RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  SortExec: expr=[c@1 ASC]",
+            "    FilterExec: c@1 > 3",
+            "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          SortExec: expr=[c@2 ASC]",
+            "          SortExec: expr=[c@1 ASC]",
             "            CoalescePartitionsExec",
-            "              RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
         ];
 
         let expected_optimized = [
-            "SortPreservingMergeExec: [c@2 ASC]",
-            "  FilterExec: c@2 > 3",
-            "    SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=c@2 ASC",
+            "SortPreservingMergeExec: [c@1 ASC]",
+            "  FilterExec: c@1 > 3",
+            "    SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=c@1 ASC",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        SortExec: expr=[c@2 ASC]",
+            "        SortExec: expr=[c@1 ASC]",
             "          CoalescePartitionsExec",
-            "            RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "            RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "              RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "                CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
         ];
@@ -705,21 +713,27 @@ mod tests {
 
         let hash_join_exec =
             hash_join_exec(left_coalesce_partitions, 
right_coalesce_partitions);
-        let sort = sort_exec(vec![sort_expr_default("a", &schema)], 
hash_join_exec, true);
+        let sort = sort_exec(
+            vec![sort_expr_default("a", &hash_join_exec.schema())],
+            hash_join_exec,
+            true,
+        );
 
-        let physical_plan =
-            sort_preserving_merge_exec(vec![sort_expr_default("a", &schema)], 
sort);
+        let physical_plan = sort_preserving_merge_exec(
+            vec![sort_expr_default("a", &sort.schema())],
+            sort,
+        );
 
         let expected_input = [
             "SortPreservingMergeExec: [a@0 ASC]",
             "  SortExec: expr=[a@0 ASC]",
             "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, 
c@1)]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
         ];
@@ -729,11 +743,11 @@ mod tests {
             "  SortExec: expr=[a@0 ASC]",
             "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, 
c@1)]",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
             "      CoalesceBatchesExec: target_batch_size=4096",
-            "        RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS 
LAST], has_header=true",
         ];
@@ -754,11 +768,11 @@ mod tests {
 
         let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
-            "    RepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8",
+            "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
         let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC NULLS 
LAST]",
-            "  SortPreservingRepartitionExec: partitioning=Hash([c1@0], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
+            "  SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
             "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true"];
         assert_optimized!(expected_input, expected_optimized, physical_plan, 
true);
@@ -821,24 +835,23 @@ mod tests {
     }
 
     fn repartition_exec_hash(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        let input_schema = input.schema();
         Arc::new(
             RepartitionExec::try_new(
                 input,
-                Partitioning::Hash(vec![Arc::new(Column::new("c1", 0))], 8),
+                Partitioning::Hash(vec![col("c", &input_schema).unwrap()], 8),
             )
             .unwrap(),
         )
     }
 
-    fn filter_exec(
-        input: Arc<dyn ExecutionPlan>,
-        schema: &SchemaRef,
-    ) -> Arc<dyn ExecutionPlan> {
+    fn filter_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+        let input_schema = input.schema();
         let predicate = expressions::binary(
-            col("c", schema).unwrap(),
+            col("c", &input_schema).unwrap(),
             Operator::Gt,
             expressions::lit(3i32),
-            schema,
+            &input_schema,
         )
         .unwrap();
         Arc::new(FilterExec::try_new(predicate, input).unwrap())
@@ -856,11 +869,15 @@ mod tests {
         left: Arc<dyn ExecutionPlan>,
         right: Arc<dyn ExecutionPlan>,
     ) -> Arc<dyn ExecutionPlan> {
+        let left_on = col("c", &left.schema()).unwrap();
+        let right_on = col("c", &right.schema()).unwrap();
+        let left_col = left_on.as_any().downcast_ref::<Column>().unwrap();
+        let right_col = right_on.as_any().downcast_ref::<Column>().unwrap();
         Arc::new(
             HashJoinExec::try_new(
                 left,
                 right,
-                vec![(Column::new("c", 1), Column::new("c", 1))],
+                vec![(left_col.clone(), right_col.clone())],
                 None,
                 &JoinType::Inner,
                 PartitionMode::Partitioned,

Reply via email to