This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new a9fd64d8 Minor refactor to reduce duplicate code (#659)
a9fd64d8 is described below
commit a9fd64d86280f80f0c634a8eb26703483e15539c
Author: Andy Grove <[email protected]>
AuthorDate: Mon Feb 13 00:48:22 2023 -0700
Minor refactor to reduce duplicate code (#659)
---
ballista/scheduler/src/planner.rs | 53 +++++++++++++--------------------------
1 file changed, 17 insertions(+), 36 deletions(-)
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index dc069872..538283cd 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -16,8 +16,6 @@
// under the License.
//! Distributed query execution
-//!
-//! This code is EXPERIMENTAL and still under development
use std::collections::HashMap;
use std::sync::Arc;
@@ -84,7 +82,6 @@ impl DistributedPlanner {
job_id: &'a str,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Result<PartialQueryStageResult> {
- // async move {
// recurse down and replace children
if execution_plan.children().is_empty() {
return Ok((execution_plan, vec![]));
@@ -109,17 +106,7 @@ impl DistributedPlanner {
children[0].clone(),
None,
)?;
- let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
- shuffle_writer.stage_id(),
- shuffle_writer.schema(),
- shuffle_writer.output_partitioning().partition_count(),
- shuffle_writer
- .shuffle_output_partitioning()
- .map(|p| p.partition_count())
- .unwrap_or_else(|| {
- shuffle_writer.output_partitioning().partition_count()
- }),
- ));
+ let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?,
@@ -135,17 +122,7 @@ impl DistributedPlanner {
children[0].clone(),
None,
)?;
- let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
- shuffle_writer.stage_id(),
- shuffle_writer.schema(),
- shuffle_writer.output_partitioning().partition_count(),
- shuffle_writer
- .shuffle_output_partitioning()
- .map(|p| p.partition_count())
- .unwrap_or_else(|| {
- shuffle_writer.output_partitioning().partition_count()
- }),
- ));
+ let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?,
@@ -162,17 +139,7 @@ impl DistributedPlanner {
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
- let unresolved_shuffle =
Arc::new(UnresolvedShuffleExec::new(
- shuffle_writer.stage_id(),
- shuffle_writer.schema(),
- shuffle_writer.output_partitioning().partition_count(),
- shuffle_writer
- .shuffle_output_partitioning()
- .map(|p| p.partition_count())
- .unwrap_or_else(|| {
-
shuffle_writer.output_partitioning().partition_count()
- }),
- ));
+ let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
@@ -202,6 +169,20 @@ impl DistributedPlanner {
}
}
+fn create_unresolved_shuffle(
+ shuffle_writer: &ShuffleWriterExec,
+) -> Arc<UnresolvedShuffleExec> {
+ Arc::new(UnresolvedShuffleExec::new(
+ shuffle_writer.stage_id(),
+ shuffle_writer.schema(),
+ shuffle_writer.output_partitioning().partition_count(),
+ shuffle_writer
+ .shuffle_output_partitioning()
+ .map(|p| p.partition_count())
+ .unwrap_or_else(||
shuffle_writer.output_partitioning().partition_count()),
+ ))
+}
+
/// Returns the unresolved shuffles in the execution plan
pub fn find_unresolved_shuffles(
plan: &Arc<dyn ExecutionPlan>,