thinkharderdev commented on code in PR #153: URL: https://github.com/apache/arrow-ballista/pull/153#discussion_r954350298
########## ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs: ########## @@ -0,0 +1,914 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; +use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning}; +use datafusion::prelude::SessionContext; +use datafusion_proto::logical_plan::AsLogicalPlan; +use log::{debug, warn}; + +use ballista_core::error::{BallistaError, Result}; +use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; +use ballista_core::serde::protobuf::task_status; +use ballista_core::serde::protobuf::{self, OperatorMetricsSet}; +use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; +use ballista_core::serde::scheduler::PartitionLocation; +use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; + +use crate::display::DisplayableBallistaExecutionPlan; + +/// A stage in the ExecutionGraph, +/// represents a set of tasks (one per each `partition`) which can be executed concurrently. +/// For a stage, there are five states. And the state machine is as follows: +/// +/// UnResolvedStage FailedStage +/// ↓ ↙ ↑ +/// ResolvedStage → RunningStage +/// ↓ +/// CompletedStage +#[derive(Clone)] +pub(super) enum ExecutionStage { + UnResolved(UnResolvedStage), + Resolved(ResolvedStage), + Running(RunningStage), + Completed(CompletedStage), + Failed(FailedStage), +} + +impl Debug for ExecutionStage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ExecutionStage::UnResolved(unresolved_stage) => unresolved_stage.fmt(f), + ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f), + ExecutionStage::Running(running_stage) => running_stage.fmt(f), + ExecutionStage::Completed(completed_stage) => completed_stage.fmt(f), + ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f), + } + } +} + +/// For a stage whose input stages are not all completed, we say it's a unresolved stage +#[derive(Clone)] +pub(super) struct UnResolvedStage { Review Comment: ```suggestion pub(super) struct UnresolvedStage { ``` ########## ballista/rust/scheduler/src/state/execution_graph.rs: ########## @@ -175,145 +154,226 @@ impl ExecutionGraph { /// An ExecutionGraph is complete if all its stages are complete pub fn complete(&self) -> bool { - self.stages.values().all(|s| s.complete()) + self.stages + .values() + .all(|s| matches!(s, ExecutionStage::Completed(_))) + } + + /// Revive the execution graph by converting the resolved stages to running changes + /// If existing such change, return true; else false. Review Comment: ```suggestion /// Revive the execution graph by converting the resolved stages to running stages /// If any stages are converted, return true; else false. ``` ########## ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs: ########## @@ -0,0 +1,914 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; +use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning}; +use datafusion::prelude::SessionContext; +use datafusion_proto::logical_plan::AsLogicalPlan; +use log::{debug, warn}; + +use ballista_core::error::{BallistaError, Result}; +use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; +use ballista_core::serde::protobuf::task_status; +use ballista_core::serde::protobuf::{self, OperatorMetricsSet}; +use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; +use ballista_core::serde::scheduler::PartitionLocation; +use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; + +use crate::display::DisplayableBallistaExecutionPlan; + +/// A stage in the ExecutionGraph, +/// represents a set of tasks (one per each `partition`) which can be executed concurrently. +/// For a stage, there are five states. And the state machine is as follows: +/// +/// UnResolvedStage FailedStage +/// ↓ ↙ ↑ +/// ResolvedStage → RunningStage +/// ↓ +/// CompletedStage +#[derive(Clone)] +pub(super) enum ExecutionStage { + UnResolved(UnResolvedStage), + Resolved(ResolvedStage), + Running(RunningStage), + Completed(CompletedStage), + Failed(FailedStage), +} Review Comment: I really like representing this as an enum! ########## ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs: ########## @@ -0,0 +1,914 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; +use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning}; +use datafusion::prelude::SessionContext; +use datafusion_proto::logical_plan::AsLogicalPlan; +use log::{debug, warn}; + +use ballista_core::error::{BallistaError, Result}; +use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; +use ballista_core::serde::protobuf::task_status; +use ballista_core::serde::protobuf::{self, OperatorMetricsSet}; +use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; +use ballista_core::serde::scheduler::PartitionLocation; +use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; + +use crate::display::DisplayableBallistaExecutionPlan; + +/// A stage in the ExecutionGraph, +/// represents a set of tasks (one per each `partition`) which can be executed concurrently. +/// For a stage, there are five states. And the state machine is as follows: +/// +/// UnResolvedStage FailedStage +/// ↓ ↙ ↑ +/// ResolvedStage → RunningStage +/// ↓ +/// CompletedStage +#[derive(Clone)] +pub(super) enum ExecutionStage { + UnResolved(UnResolvedStage), + Resolved(ResolvedStage), + Running(RunningStage), + Completed(CompletedStage), + Failed(FailedStage), +} + +impl Debug for ExecutionStage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ExecutionStage::UnResolved(unresolved_stage) => unresolved_stage.fmt(f), + ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f), + ExecutionStage::Running(running_stage) => running_stage.fmt(f), + ExecutionStage::Completed(completed_stage) => completed_stage.fmt(f), + ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f), + } + } +} + +/// For a stage whose input stages are not all completed, we say it's a unresolved stage +#[derive(Clone)] +pub(super) struct UnResolvedStage { + /// Stage ID + pub(super) stage_id: usize, + /// Output partitioning for this stage. + pub(super) output_partitioning: Option<Partitioning>, + /// Stage ID of the stage that will take this stages outputs as inputs. + /// If `output_links` is empty then this the final stage in the `ExecutionGraph` + pub(super) output_links: Vec<usize>, + /// Represents the outputs from this stage's child stages. + /// This stage can only be resolved an executed once all child stages are completed. + pub(super) inputs: HashMap<usize, StageOutput>, + /// `ExecutionPlan` for this stage + pub(super) plan: Arc<dyn ExecutionPlan>, +} + +/// For a stage, if it has no inputs or all of its input stages are completed, +/// then we it's a resolved stage +#[derive(Clone)] +pub(super) struct ResolvedStage { + /// Stage ID + pub(super) stage_id: usize, + /// Total number of output partitions for this stage. + /// This stage will produce on task for partition. + pub(super) partitions: usize, + /// Output partitioning for this stage. + pub(super) output_partitioning: Option<Partitioning>, + /// Stage ID of the stage that will take this stages outputs as inputs. + /// If `output_links` is empty then this the final stage in the `ExecutionGraph` + pub(super) output_links: Vec<usize>, + /// `ExecutionPlan` for this stage + pub(super) plan: Arc<dyn ExecutionPlan>, +} + +/// Different from the resolved stage, a running stage will +/// 1. save the execution plan as encoded one to avoid serialization cost for creating task definition2 +/// 2. manage the task statuses +/// 3. manage the stage-level combined metrics +/// Running stages will only be maintained in memory and will not saved to the backend storage +#[derive(Clone)] +pub(super) struct RunningStage { + /// Stage ID + pub(super) stage_id: usize, + /// Total number of output partitions for this stage. + /// This stage will produce on task for partition. + pub(super) partitions: usize, + /// Output partitioning for this stage. + pub(super) output_partitioning: Option<Partitioning>, + /// Stage ID of the stage that will take this stages outputs as inputs. + /// If `output_links` is empty then this the final stage in the `ExecutionGraph` + pub(super) output_links: Vec<usize>, + /// `ExecutionPlan` for this stage + pub(super) plan: Arc<dyn ExecutionPlan>, + /// Status of each already scheduled task. If status is None, the partition has not yet been scheduled + pub(super) task_statuses: Vec<Option<task_status::Status>>, + /// Combined metrics of the already finished tasks in the stage, If it is None, no task is finished yet. + pub(super) stage_metrics: Option<Vec<MetricsSet>>, +} + +/// If a stage finishes successfully, its task statuses and metrics will be finalized +#[derive(Clone)] +pub(super) struct CompletedStage { + /// Stage ID + pub(super) stage_id: usize, + /// Total number of output partitions for this stage. + /// This stage will produce on task for partition. + pub(super) partitions: usize, + /// Output partitioning for this stage. + pub(super) output_partitioning: Option<Partitioning>, + /// Stage ID of the stage that will take this stages outputs as inputs. + /// If `output_links` is empty then this the final stage in the `ExecutionGraph` + pub(super) output_links: Vec<usize>, + /// `ExecutionPlan` for this stage + pub(super) plan: Arc<dyn ExecutionPlan>, + /// Status of each already scheduled task. + pub(super) task_statuses: Vec<task_status::Status>, + /// Combined metrics of the already finished tasks in the stage. + pub(super) stage_metrics: Vec<MetricsSet>, +} + +/// If a stage fails, it will be with an error message +#[derive(Clone)] +pub(super) struct FailedStage { + /// Stage ID + pub(super) stage_id: usize, + /// Total number of output partitions for this stage. + /// This stage will produce on task for partition. + pub(super) partitions: usize, + /// Output partitioning for this stage. + pub(super) output_partitioning: Option<Partitioning>, + /// Stage ID of the stage that will take this stages outputs as inputs. + /// If `output_links` is empty then this the final stage in the `ExecutionGraph` + pub(super) output_links: Vec<usize>, + /// `ExecutionPlan` for this stage + pub(super) plan: Arc<dyn ExecutionPlan>, + /// Status of each already scheduled task. If status is None, the partition has not yet been scheduled + pub(super) task_statuses: Vec<Option<task_status::Status>>, + /// Combined metrics of the already finished tasks in the stage, If it is None, no task is finished yet. + pub(super) stage_metrics: Option<Vec<MetricsSet>>, + /// Error message + pub(super) error_message: String, +} + +impl UnResolvedStage { Review Comment: ```suggestion impl UnresolvedStage { ``` ########## ballista/rust/scheduler/src/scheduler_server/event.rs: ########## @@ -36,6 +36,10 @@ pub enum QueryStageSchedulerEvent { plan: Box<LogicalPlan>, }, JobSubmitted(String), - JobFinished(String), + // For a job fails without its execution graph JobFailed(String, String), Review Comment: ```suggestion // For a job which failed during planning JobPlanningFailed(String, String), ``` ########## ballista/rust/scheduler/src/state/task_manager.rs: ########## @@ -55,44 +58,62 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> clients: ExecutorClients, session_builder: SessionBuilder, codec: BallistaCodec<T, U>, + scheduler_id: String, + // Cache for active execution graphs curated by this scheduler + active_job_cache: ExecutionGraphCache, } impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> { pub fn new( state: Arc<dyn StateBackendClient>, session_builder: SessionBuilder, codec: BallistaCodec<T, U>, + scheduler_id: String, ) -> Self { Self { state, clients: Default::default(), session_builder, codec, + scheduler_id, + active_job_cache: Arc::new(RwLock::new(HashMap::new())), } } /// Generate an ExecutionGraph for the job and save it to the persistent state. + /// By default, this job will be curated by the scheduler which receives it. + /// Then we will also save it to the active execution graph pub async fn submit_job( &self, job_id: &str, session_id: &str, plan: Arc<dyn ExecutionPlan>, ) -> Result<()> { - let graph = ExecutionGraph::new(job_id, session_id, plan)?; + let mut graph = + ExecutionGraph::new(&self.scheduler_id, job_id, session_id, plan)?; self.state .put( Keyspace::ActiveJobs, job_id.to_owned(), - self.encode_execution_graph(graph)?, + self.encode_execution_graph(graph.clone())?, ) .await?; + graph.revive(); Review Comment: Why would we need to call `revive` here? ########## ballista/rust/scheduler/src/state/task_manager.rs: ########## @@ -106,123 +127,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> } } - /// Generate a new random Job ID - pub fn generate_job_id(&self) -> String { - let mut rng = thread_rng(); - std::iter::repeat(()) - .map(|()| rng.sample(Alphanumeric)) - .map(char::from) - .take(7) - .collect() - } - - /// Atomically update given task statuses in the respective job and return a tuple containing: + /// Update given task statuses in the respective job and return a tuple containing: /// 1. A list of QueryStageSchedulerEvent to publish. /// 2. A list of reservations that can now be offered. - /// - /// When a task is updated, there may or may not be more tasks pending for its job. If there are more - /// tasks pending then we want to reschedule one of those tasks on the same task slot. In that case - /// we will set the `job_id` on the `ExecutorReservation` so the scheduler attempts to assign tasks from - /// the same job. Note that when the scheduler attempts to fill the reservation, there is no guarantee - /// that the available task is still available. pub(crate) async fn update_task_statuses( &self, executor: &ExecutorMetadata, task_status: Vec<TaskStatus>, ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> { - let lock = self.state.lock(Keyspace::ActiveJobs, "").await?; - - with_lock(lock, async { - let mut events: Vec<QueryStageSchedulerEvent> = vec![]; - let mut reservation: Vec<ExecutorReservation> = vec![]; + let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new(); + for status in task_status { + debug!("Task Update\n{:?}", status); + if let Some(job_id) = status.task_id.as_ref().map(|id| &id.job_id) { + let job_task_statuses = + job_updates.entry(job_id.clone()).or_insert_with(Vec::new); + job_task_statuses.push(status); + } else { + warn!("Received task with no job ID"); + } + } - let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new(); + let mut events: Vec<QueryStageSchedulerEvent> = vec![]; + let mut total_num_tasks = 0; + for (job_id, statuses) in job_updates { + let num_tasks = statuses.len(); + debug!("Updating {} tasks in job {}", num_tasks, job_id); - for status in task_status { - debug!("Task Update\n{:?}", status); - if let Some(job_id) = status.task_id.as_ref().map(|id| &id.job_id) { - if let Some(statuses) = job_updates.get_mut(job_id) { - statuses.push(status) - } else { - job_updates.insert(job_id.clone(), vec![status]); - } - } else { - warn!("Received task with no job ID"); - } - } + total_num_tasks += num_tasks; - let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![]; - - for (job_id, statuses) in job_updates { - let num_tasks = statuses.len(); - debug!("Updating {} tasks in job {}", num_tasks, job_id); - - let mut graph = self.get_execution_graph(&job_id).await?; - - graph.update_task_status(executor, statuses)?; - - if graph.complete() { - // If this ExecutionGraph is complete, finalize it - info!( - "Job {} is complete, finalizing output partitions", - graph.job_id() - ); - graph.finalize()?; - events.push(QueryStageSchedulerEvent::JobFinished(job_id.clone())); - - for _ in 0..num_tasks { - reservation - .push(ExecutorReservation::new_free(executor.id.to_owned())); - } - } else if let Some(job_status::Status::Failed(failure)) = - graph.status().status - { - events.push(QueryStageSchedulerEvent::JobFailed( - job_id.clone(), - failure.error, - )); - - for _ in 0..num_tasks { - reservation - .push(ExecutorReservation::new_free(executor.id.to_owned())); - } - } else { - // Otherwise keep the task slots reserved for this job - for _ in 0..num_tasks { - reservation.push(ExecutorReservation::new_assigned( - executor.id.to_owned(), - job_id.clone(), - )); - } - } + let graph = self.get_active_execution_graph(&job_id).await; + let job_event = if let Some(graph) = graph { + let mut graph = graph.write().await; + graph.update_task_status(executor, statuses)? + } else { + // TODO Deal with curator changed case + error!("Fail to find job {} in the active cache and it may not be curated by this scheduler", job_id); + None Review Comment: For now it seems like we should explicitly fail the job here so the job state gets updated. ########## ballista/rust/scheduler/src/state/task_manager.rs: ########## @@ -37,16 +37,19 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_proto::logical_plan::AsLogicalPlan; -use log::{debug, info, warn}; +#[cfg(not(test))] +use log::info; +use log::{debug, error, warn}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::default::Default; use std::sync::Arc; use tokio::sync::RwLock; use tonic::transport::Channel; type ExecutorClients = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>; +type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>; #[derive(Clone)] pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> { Review Comment: This requires changes to the executors as well right? Currently the executor will just have a single connection to the scheduler but it will need to accept the scheduler URL as part of the task definition for this to work correctly. ########## ballista/rust/scheduler/src/state/task_manager.rs: ########## @@ -37,16 +37,19 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_proto::logical_plan::AsLogicalPlan; -use log::{debug, info, warn}; +#[cfg(not(test))] +use log::info; +use log::{debug, error, warn}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::default::Default; use std::sync::Arc; use tokio::sync::RwLock; use tonic::transport::Channel; type ExecutorClients = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>; +type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>; Review Comment: What do you think about making this a `Arc<RwLock<VecDequeue<(String,ExecutionGraph)>>>`? Lookup by job ID becomes `O(n)` but the number of active jobs shouldn't be prohibitive and you get a proper queue in return so we can do FIFO scheduling of new jobs. ########## ballista/rust/scheduler/src/state/execution_graph.rs: ########## @@ -15,61 +15,40 @@ // specific language governing permissions and limitations Review Comment: Should this entire file move to src/state/execution_graph/mod.rs ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org