This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e65d49c UnresolvedShuffleExec should represent a single shuffle (#727)
e65d49c is described below
commit e65d49ca7d05916591c051de1fccc0764dd18323
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 16 06:55:18 2021 -0600
UnresolvedShuffleExec should represent a single shuffle (#727)
---
ballista/rust/core/proto/ballista.proto | 2 +-
.../core/src/execution_plans/unresolved_shuffle.rs | 14 ++--
.../core/src/serde/physical_plan/from_proto.rs | 6 +-
.../rust/core/src/serde/physical_plan/to_proto.rs | 6 +-
ballista/rust/core/src/utils.rs | 12 ++--
ballista/rust/scheduler/src/planner.rs | 30 ++++----
ballista/rust/scheduler/src/state/mod.rs | 83 +++++++++++-----------
7 files changed, 68 insertions(+), 85 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 5b3e93e..1c2328e 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -542,7 +542,7 @@ message PhysicalNegativeNode {
}
message UnresolvedShuffleExecNode {
- repeated uint32 query_stage_ids = 1;
+ uint32 stage_id = 1;
Schema schema = 2;
uint32 partition_count = 3;
}
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 49b4f7a..cb351ee 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -31,14 +31,14 @@ use datafusion::{
use log::info;
use std::fmt::Formatter;
-/// UnresolvedShuffleExec represents a dependency on the results of several
ShuffleWriterExec nodes which haven't been computed yet.
+/// UnresolvedShuffleExec represents a dependency on the results of a
ShuffleWriterExec node which hasn't computed yet.
///
/// An ExecutionPlan that contains an UnresolvedShuffleExec isn't ready for
execution. The presence of this ExecutionPlan
-/// is used as a signal so the scheduler knows it can't start computation on a
specific ShuffleWriterExec.
+/// is used as a signal so the scheduler knows it can't start computation
until the dependent shuffle has completed.
#[derive(Debug, Clone)]
pub struct UnresolvedShuffleExec {
// The query stage ids which needs to be computed
- pub query_stage_ids: Vec<usize>,
+ pub stage_id: usize,
// The schema this node will have once it is replaced with a
ShuffleReaderExec
pub schema: SchemaRef,
@@ -49,13 +49,9 @@ pub struct UnresolvedShuffleExec {
impl UnresolvedShuffleExec {
/// Create a new UnresolvedShuffleExec
- pub fn new(
- query_stage_ids: Vec<usize>,
- schema: SchemaRef,
- partition_count: usize,
- ) -> Self {
+ pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) ->
Self {
Self {
- query_stage_ids,
+ stage_id,
schema,
partition_count,
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index a1a60bd..4b0a984 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -464,11 +464,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
PhysicalPlanType::Unresolved(unresolved_shuffle) => {
let schema =
Arc::new(convert_required!(unresolved_shuffle.schema)?);
Ok(Arc::new(UnresolvedShuffleExec {
- query_stage_ids: unresolved_shuffle
- .query_stage_ids
- .iter()
- .map(|id| *id as usize)
- .collect(),
+ stage_id: unresolved_shuffle.stage_id as usize,
schema,
partition_count: unresolved_shuffle.partition_count as
usize,
}))
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index cdd33f9..0429efb 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -391,11 +391,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Unresolved(
protobuf::UnresolvedShuffleExecNode {
- query_stage_ids: exec
- .query_stage_ids
- .iter()
- .map(|id| *id as u32)
- .collect(),
+ stage_id: exec.stage_id as u32,
schema: Some(exec.schema().as_ref().into()),
partition_count: exec.partition_count as u32,
},
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 7e9a55a..8b1cf61 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -209,13 +209,11 @@ fn build_exec_plan_diagram(
for child in plan.children() {
if let Some(shuffle) =
child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
if !draw_entity {
- for y in &shuffle.query_stage_ids {
- writeln!(
- w,
- "\tstage_{}_exec_1 -> stage_{}_exec_{};",
- y, stage_id, node_id
- )?;
- }
+ writeln!(
+ w,
+ "\tstage_{}_exec_1 -> stage_{}_exec_{};",
+ shuffle.stage_id, stage_id, node_id
+ )?;
}
} else {
// relationships within same entity
diff --git a/ballista/rust/scheduler/src/planner.rs
b/ballista/rust/scheduler/src/planner.rs
index 3195261..3f90da2 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -112,7 +112,7 @@ impl DistributedPlanner {
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
- vec![query_stage.stage_id()],
+ query_stage.stage_id(),
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
@@ -131,7 +131,7 @@ impl DistributedPlanner {
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
- vec![query_stage.stage_id()],
+ query_stage.stage_id(),
query_stage.schema(),
query_stage.output_partitioning().partition_count(),
));
@@ -166,19 +166,17 @@ pub fn remove_unresolved_shuffles(
child.as_any().downcast_ref::<UnresolvedShuffleExec>()
{
let mut relevant_locations = vec![];
- for id in &unresolved_shuffle.query_stage_ids {
- relevant_locations.append(
- &mut partition_locations
- .get(id)
- .ok_or_else(|| {
- BallistaError::General(
- "Missing partition location. Could not remove
unresolved shuffles"
- .to_owned(),
- )
- })?
- .clone(),
- );
- }
+ relevant_locations.append(
+ &mut partition_locations
+ .get(&unresolved_shuffle.stage_id)
+ .ok_or_else(|| {
+ BallistaError::General(
+ "Missing partition location. Could not remove
unresolved shuffles"
+ .to_owned(),
+ )
+ })?
+ .clone(),
+ );
new_children.push(Arc::new(ShuffleReaderExec::try_new(
relevant_locations,
unresolved_shuffle.schema().clone(),
@@ -297,7 +295,7 @@ mod test {
let unresolved_shuffle = coalesce_partitions.children()[0].clone();
let unresolved_shuffle =
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
- assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
+ assert_eq!(unresolved_shuffle.stage_id, 2);
Ok(())
}
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index cbee3f1..3ddbced 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -302,48 +302,47 @@ impl SchedulerState {
Vec<Vec<ballista_core::serde::scheduler::PartitionLocation>>,
> = HashMap::new();
for unresolved_shuffle in unresolved_shuffles {
- for stage_id in unresolved_shuffle.query_stage_ids {
- for partition_id in
0..unresolved_shuffle.partition_count {
- let referenced_task = tasks
- .get(&get_task_status_key(
- &self.namespace,
- &partition.job_id,
- stage_id,
- partition_id,
- ))
- .unwrap();
- let task_is_dead = self
- .reschedule_dead_task(&referenced_task,
&executors)
- .await?;
- if task_is_dead {
- continue 'tasks;
- } else if let Some(task_status::Status::Completed(
- CompletedTask { executor_id },
- )) = &referenced_task.status
- {
- let empty = vec![];
- let locations =
-
partition_locations.entry(stage_id).or_insert(empty);
- let executor_meta = executors
- .iter()
- .find(|exec| exec.id == *executor_id)
- .unwrap()
- .clone();
- locations.push(vec![
-
ballista_core::serde::scheduler::PartitionLocation {
- partition_id:
-
ballista_core::serde::scheduler::PartitionId {
- job_id:
partition.job_id.clone(),
- stage_id,
- partition_id,
- },
- executor_meta,
- partition_stats:
PartitionStats::default(),
- },
- ]);
- } else {
- continue 'tasks;
- }
+ for partition_id in 0..unresolved_shuffle.partition_count {
+ let referenced_task = tasks
+ .get(&get_task_status_key(
+ &self.namespace,
+ &partition.job_id,
+ unresolved_shuffle.stage_id,
+ partition_id,
+ ))
+ .unwrap();
+ let task_is_dead = self
+ .reschedule_dead_task(&referenced_task, &executors)
+ .await?;
+ if task_is_dead {
+ continue 'tasks;
+ } else if let Some(task_status::Status::Completed(
+ CompletedTask { executor_id },
+ )) = &referenced_task.status
+ {
+ let empty = vec![];
+ let locations = partition_locations
+ .entry(unresolved_shuffle.stage_id)
+ .or_insert(empty);
+ let executor_meta = executors
+ .iter()
+ .find(|exec| exec.id == *executor_id)
+ .unwrap()
+ .clone();
+ locations.push(vec![
+
ballista_core::serde::scheduler::PartitionLocation {
+ partition_id:
+
ballista_core::serde::scheduler::PartitionId {
+ job_id: partition.job_id.clone(),
+ stage_id:
unresolved_shuffle.stage_id,
+ partition_id,
+ },
+ executor_meta,
+ partition_stats: PartitionStats::default(),
+ },
+ ]);
+ } else {
+ continue 'tasks;
}
}
}