This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e65d49c  UnresolvedShuffleExec should represent a single shuffle (#727)
e65d49c is described below

commit e65d49ca7d05916591c051de1fccc0764dd18323
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 16 06:55:18 2021 -0600

    UnresolvedShuffleExec should represent a single shuffle (#727)
---
 ballista/rust/core/proto/ballista.proto            |  2 +-
 .../core/src/execution_plans/unresolved_shuffle.rs | 14 ++--
 .../core/src/serde/physical_plan/from_proto.rs     |  6 +-
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  6 +-
 ballista/rust/core/src/utils.rs                    | 12 ++--
 ballista/rust/scheduler/src/planner.rs             | 30 ++++----
 ballista/rust/scheduler/src/state/mod.rs           | 83 +++++++++++-----------
 7 files changed, 68 insertions(+), 85 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 5b3e93e..1c2328e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -542,7 +542,7 @@ message PhysicalNegativeNode {
 }
 
 message UnresolvedShuffleExecNode {
-  repeated uint32 query_stage_ids = 1;
+  uint32 stage_id = 1;
   Schema schema = 2;
   uint32 partition_count = 3;
 }
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 49b4f7a..cb351ee 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -31,14 +31,14 @@ use datafusion::{
 use log::info;
 use std::fmt::Formatter;
 
-/// UnresolvedShuffleExec represents a dependency on the results of several 
ShuffleWriterExec nodes which haven't been computed yet.
+/// UnresolvedShuffleExec represents a dependency on the results of a 
ShuffleWriterExec node which hasn't computed yet.
 ///
 /// An ExecutionPlan that contains an UnresolvedShuffleExec isn't ready for 
execution. The presence of this ExecutionPlan
-/// is used as a signal so the scheduler knows it can't start computation on a 
specific ShuffleWriterExec.
+/// is used as a signal so the scheduler knows it can't start computation 
until the dependent shuffle has completed.
 #[derive(Debug, Clone)]
 pub struct UnresolvedShuffleExec {
     // The query stage ids which needs to be computed
-    pub query_stage_ids: Vec<usize>,
+    pub stage_id: usize,
 
     // The schema this node will have once it is replaced with a 
ShuffleReaderExec
     pub schema: SchemaRef,
@@ -49,13 +49,9 @@ pub struct UnresolvedShuffleExec {
 
 impl UnresolvedShuffleExec {
     /// Create a new UnresolvedShuffleExec
-    pub fn new(
-        query_stage_ids: Vec<usize>,
-        schema: SchemaRef,
-        partition_count: usize,
-    ) -> Self {
+    pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> 
Self {
         Self {
-            query_stage_ids,
+            stage_id,
             schema,
             partition_count,
         }
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index a1a60bd..4b0a984 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -464,11 +464,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for 
&protobuf::PhysicalPlanNode {
             PhysicalPlanType::Unresolved(unresolved_shuffle) => {
                 let schema = 
Arc::new(convert_required!(unresolved_shuffle.schema)?);
                 Ok(Arc::new(UnresolvedShuffleExec {
-                    query_stage_ids: unresolved_shuffle
-                        .query_stage_ids
-                        .iter()
-                        .map(|id| *id as usize)
-                        .collect(),
+                    stage_id: unresolved_shuffle.stage_id as usize,
                     schema,
                     partition_count: unresolved_shuffle.partition_count as 
usize,
                 }))
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index cdd33f9..0429efb 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -391,11 +391,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn 
ExecutionPlan> {
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Unresolved(
                     protobuf::UnresolvedShuffleExecNode {
-                        query_stage_ids: exec
-                            .query_stage_ids
-                            .iter()
-                            .map(|id| *id as u32)
-                            .collect(),
+                        stage_id: exec.stage_id as u32,
                         schema: Some(exec.schema().as_ref().into()),
                         partition_count: exec.partition_count as u32,
                     },
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 7e9a55a..8b1cf61 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -209,13 +209,11 @@ fn build_exec_plan_diagram(
     for child in plan.children() {
         if let Some(shuffle) = 
child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
             if !draw_entity {
-                for y in &shuffle.query_stage_ids {
-                    writeln!(
-                        w,
-                        "\tstage_{}_exec_1 -> stage_{}_exec_{};",
-                        y, stage_id, node_id
-                    )?;
-                }
+                writeln!(
+                    w,
+                    "\tstage_{}_exec_1 -> stage_{}_exec_{};",
+                    shuffle.stage_id, stage_id, node_id
+                )?;
             }
         } else {
             // relationships within same entity
diff --git a/ballista/rust/scheduler/src/planner.rs 
b/ballista/rust/scheduler/src/planner.rs
index 3195261..3f90da2 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -112,7 +112,7 @@ impl DistributedPlanner {
                 None,
             )?;
             let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
-                vec![query_stage.stage_id()],
+                query_stage.stage_id(),
                 query_stage.schema(),
                 query_stage.output_partitioning().partition_count(),
             ));
@@ -131,7 +131,7 @@ impl DistributedPlanner {
                 Some(repart.partitioning().to_owned()),
             )?;
             let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
-                vec![query_stage.stage_id()],
+                query_stage.stage_id(),
                 query_stage.schema(),
                 query_stage.output_partitioning().partition_count(),
             ));
@@ -166,19 +166,17 @@ pub fn remove_unresolved_shuffles(
             child.as_any().downcast_ref::<UnresolvedShuffleExec>()
         {
             let mut relevant_locations = vec![];
-            for id in &unresolved_shuffle.query_stage_ids {
-                relevant_locations.append(
-                    &mut partition_locations
-                        .get(id)
-                        .ok_or_else(|| {
-                            BallistaError::General(
-                                "Missing partition location. Could not remove 
unresolved shuffles"
-                                    .to_owned(),
-                            )
-                        })?
-                        .clone(),
-                );
-            }
+            relevant_locations.append(
+                &mut partition_locations
+                    .get(&unresolved_shuffle.stage_id)
+                    .ok_or_else(|| {
+                        BallistaError::General(
+                            "Missing partition location. Could not remove 
unresolved shuffles"
+                                .to_owned(),
+                        )
+                    })?
+                    .clone(),
+            );
             new_children.push(Arc::new(ShuffleReaderExec::try_new(
                 relevant_locations,
                 unresolved_shuffle.schema().clone(),
@@ -297,7 +295,7 @@ mod test {
         let unresolved_shuffle = coalesce_partitions.children()[0].clone();
         let unresolved_shuffle =
             downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
-        assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
+        assert_eq!(unresolved_shuffle.stage_id, 2);
 
         Ok(())
     }
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index cbee3f1..3ddbced 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -302,48 +302,47 @@ impl SchedulerState {
                     
Vec<Vec<ballista_core::serde::scheduler::PartitionLocation>>,
                 > = HashMap::new();
                 for unresolved_shuffle in unresolved_shuffles {
-                    for stage_id in unresolved_shuffle.query_stage_ids {
-                        for partition_id in 
0..unresolved_shuffle.partition_count {
-                            let referenced_task = tasks
-                                .get(&get_task_status_key(
-                                    &self.namespace,
-                                    &partition.job_id,
-                                    stage_id,
-                                    partition_id,
-                                ))
-                                .unwrap();
-                            let task_is_dead = self
-                                .reschedule_dead_task(&referenced_task, 
&executors)
-                                .await?;
-                            if task_is_dead {
-                                continue 'tasks;
-                            } else if let Some(task_status::Status::Completed(
-                                CompletedTask { executor_id },
-                            )) = &referenced_task.status
-                            {
-                                let empty = vec![];
-                                let locations =
-                                    
partition_locations.entry(stage_id).or_insert(empty);
-                                let executor_meta = executors
-                                    .iter()
-                                    .find(|exec| exec.id == *executor_id)
-                                    .unwrap()
-                                    .clone();
-                                locations.push(vec![
-                                    
ballista_core::serde::scheduler::PartitionLocation {
-                                        partition_id:
-                                            
ballista_core::serde::scheduler::PartitionId {
-                                                job_id: 
partition.job_id.clone(),
-                                                stage_id,
-                                                partition_id,
-                                            },
-                                        executor_meta,
-                                        partition_stats: 
PartitionStats::default(),
-                                    },
-                                ]);
-                            } else {
-                                continue 'tasks;
-                            }
+                    for partition_id in 0..unresolved_shuffle.partition_count {
+                        let referenced_task = tasks
+                            .get(&get_task_status_key(
+                                &self.namespace,
+                                &partition.job_id,
+                                unresolved_shuffle.stage_id,
+                                partition_id,
+                            ))
+                            .unwrap();
+                        let task_is_dead = self
+                            .reschedule_dead_task(&referenced_task, &executors)
+                            .await?;
+                        if task_is_dead {
+                            continue 'tasks;
+                        } else if let Some(task_status::Status::Completed(
+                            CompletedTask { executor_id },
+                        )) = &referenced_task.status
+                        {
+                            let empty = vec![];
+                            let locations = partition_locations
+                                .entry(unresolved_shuffle.stage_id)
+                                .or_insert(empty);
+                            let executor_meta = executors
+                                .iter()
+                                .find(|exec| exec.id == *executor_id)
+                                .unwrap()
+                                .clone();
+                            locations.push(vec![
+                                
ballista_core::serde::scheduler::PartitionLocation {
+                                    partition_id:
+                                        
ballista_core::serde::scheduler::PartitionId {
+                                            job_id: partition.job_id.clone(),
+                                            stage_id: 
unresolved_shuffle.stage_id,
+                                            partition_id,
+                                        },
+                                    executor_meta,
+                                    partition_stats: PartitionStats::default(),
+                                },
+                            ]);
+                        } else {
+                            continue 'tasks;
                         }
                     }
                 }

Reply via email to