This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new a9fd64d8 Minor refactor to reduce duplicate code (#659)
a9fd64d8 is described below

commit a9fd64d86280f80f0c634a8eb26703483e15539c
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 13 00:48:22 2023 -0700

    Minor refactor to reduce duplicate code (#659)
---
 ballista/scheduler/src/planner.rs | 53 +++++++++++++--------------------------
 1 file changed, 17 insertions(+), 36 deletions(-)

diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index dc069872..538283cd 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -16,8 +16,6 @@
 // under the License.
 
 //! Distributed query execution
-//!
-//! This code is EXPERIMENTAL and still under development
 
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -84,7 +82,6 @@ impl DistributedPlanner {
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<PartialQueryStageResult> {
-        // async move {
         // recurse down and replace children
         if execution_plan.children().is_empty() {
             return Ok((execution_plan, vec![]));
@@ -109,17 +106,7 @@ impl DistributedPlanner {
                 children[0].clone(),
                 None,
             )?;
-            let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
-                shuffle_writer.stage_id(),
-                shuffle_writer.schema(),
-                shuffle_writer.output_partitioning().partition_count(),
-                shuffle_writer
-                    .shuffle_output_partitioning()
-                    .map(|p| p.partition_count())
-                    .unwrap_or_else(|| {
-                        shuffle_writer.output_partitioning().partition_count()
-                    }),
-            ));
+            let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
             stages.push(shuffle_writer);
             Ok((
                 with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?,
@@ -135,17 +122,7 @@ impl DistributedPlanner {
                 children[0].clone(),
                 None,
             )?;
-            let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
-                shuffle_writer.stage_id(),
-                shuffle_writer.schema(),
-                shuffle_writer.output_partitioning().partition_count(),
-                shuffle_writer
-                    .shuffle_output_partitioning()
-                    .map(|p| p.partition_count())
-                    .unwrap_or_else(|| {
-                        shuffle_writer.output_partitioning().partition_count()
-                    }),
-            ));
+            let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
             stages.push(shuffle_writer);
             Ok((
                 with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?,
@@ -162,17 +139,7 @@ impl DistributedPlanner {
                         children[0].clone(),
                         Some(repart.partitioning().to_owned()),
                     )?;
-                    let unresolved_shuffle = 
Arc::new(UnresolvedShuffleExec::new(
-                        shuffle_writer.stage_id(),
-                        shuffle_writer.schema(),
-                        shuffle_writer.output_partitioning().partition_count(),
-                        shuffle_writer
-                            .shuffle_output_partitioning()
-                            .map(|p| p.partition_count())
-                            .unwrap_or_else(|| {
-                                
shuffle_writer.output_partitioning().partition_count()
-                            }),
-                    ));
+                    let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
                     stages.push(shuffle_writer);
                     Ok((unresolved_shuffle, stages))
                 }
@@ -202,6 +169,20 @@ impl DistributedPlanner {
     }
 }
 
+fn create_unresolved_shuffle(
+    shuffle_writer: &ShuffleWriterExec,
+) -> Arc<UnresolvedShuffleExec> {
+    Arc::new(UnresolvedShuffleExec::new(
+        shuffle_writer.stage_id(),
+        shuffle_writer.schema(),
+        shuffle_writer.output_partitioning().partition_count(),
+        shuffle_writer
+            .shuffle_output_partitioning()
+            .map(|p| p.partition_count())
+            .unwrap_or_else(|| 
shuffle_writer.output_partitioning().partition_count()),
+    ))
+}
+
 /// Returns the unresolved shuffles in the execution plan
 pub fn find_unresolved_shuffles(
     plan: &Arc<dyn ExecutionPlan>,

Reply via email to