This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 74a8cfc0 feat: add custom task scheduling policy & make a lot of
methods public (#1243)
74a8cfc0 is described below
commit 74a8cfc0c55b3b706611efe42bba5aa3980e587f
Author: Marko Milenković <[email protected]>
AuthorDate: Fri Apr 18 16:13:14 2025 +0100
feat: add custom task scheduling policy & make a lot of methods public
(#1243)
---
ballista/scheduler/src/cluster/memory.rs | 3 +
ballista/scheduler/src/cluster/mod.rs | 66 +++++--
ballista/scheduler/src/config.rs | 6 +-
ballista/scheduler/src/scheduler_server/grpc.rs | 12 +-
ballista/scheduler/src/state/execution_graph.rs | 10 +-
.../state/{execution_graph => }/execution_stage.rs | 201 +++++++++++----------
ballista/scheduler/src/state/executor_manager.rs | 15 +-
ballista/scheduler/src/state/mod.rs | 1 +
8 files changed, 183 insertions(+), 131 deletions(-)
diff --git a/ballista/scheduler/src/cluster/memory.rs
b/ballista/scheduler/src/cluster/memory.rs
index 918a0b68..ecdf13d5 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -168,6 +168,9 @@ impl ClusterState for InMemoryClusterState {
}
bound_tasks
}
+ TaskDistributionPolicy::Custom(ref policy) => {
+ policy.bind_tasks(available_slots, active_jobs).await?
+ }
};
Ok(bound_tasks)
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 381bcdb4..6ca8884f 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -29,7 +29,7 @@ use datafusion::error::DataFusionError;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use futures::Stream;
-use log::{debug, info, warn};
+use log::debug;
use ballista_core::config::BallistaConfig;
use ballista_core::consistent_hash::ConsistentHash;
@@ -319,14 +319,14 @@ pub trait JobState: Send + Sync {
pub(crate) async fn bind_task_bias(
mut slots: Vec<&mut AvailableTaskSlots>,
- active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ running_jobs: Arc<HashMap<String, JobInfoCache>>,
if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
) -> Vec<BoundTask> {
let mut schedulable_tasks: Vec<BoundTask> = vec![];
let total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
if total_slots == 0 {
- warn!("Not enough available executor slots for task running!!!");
+ debug!("Not enough available executor slots for task running!!!");
return schedulable_tasks;
}
@@ -335,7 +335,7 @@ pub(crate) async fn bind_task_bias(
let mut idx_slot = 0usize;
let mut slot = &mut slots[idx_slot];
- for (job_id, job_info) in active_jobs.iter() {
+ for (job_id, job_info) in running_jobs.iter() {
if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
debug!(
"Job {} is not in running status and will be skipped",
@@ -350,7 +350,7 @@ pub(crate) async fn bind_task_bias(
graph.fetch_running_stage(&black_list)
{
if if_skip(running_stage.plan.clone()) {
- info!(
+ debug!(
"Will skip stage {}/{} for bias task binding",
job_id, running_stage.stage_id
);
@@ -406,23 +406,23 @@ pub(crate) async fn bind_task_bias(
pub(crate) async fn bind_task_round_robin(
mut slots: Vec<&mut AvailableTaskSlots>,
- active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ running_jobs: Arc<HashMap<String, JobInfoCache>>,
if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
) -> Vec<BoundTask> {
let mut schedulable_tasks: Vec<BoundTask> = vec![];
let mut total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
if total_slots == 0 {
- warn!("Not enough available executor slots for task running!!!");
+ debug!("Not enough available executor slots for task running!!!");
return schedulable_tasks;
}
- info!("Total slot number is {}", total_slots);
+ debug!("Total slot number is {}", total_slots);
// Sort the slots by descending order
slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
let mut idx_slot = 0usize;
- for (job_id, job_info) in active_jobs.iter() {
+ for (job_id, job_info) in running_jobs.iter() {
if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
debug!(
"Job {} is not in running status and will be skipped",
@@ -437,7 +437,7 @@ pub(crate) async fn bind_task_round_robin(
graph.fetch_running_stage(&black_list)
{
if if_skip(running_stage.plan.clone()) {
- info!(
+ debug!(
"Will skip stage {}/{} for round robin task binding",
job_id, running_stage.stage_id
);
@@ -498,16 +498,49 @@ pub(crate) async fn bind_task_round_robin(
schedulable_tasks
}
+/// Maps execution plan to list of files it scans
type GetScanFilesFunc = fn(
&str,
Arc<dyn ExecutionPlan>,
) -> datafusion::common::Result<Vec<Vec<Vec<PartitionedFile>>>>;
+/// User provided task distribution policy
+#[async_trait::async_trait]
+pub trait DistributionPolicy: std::fmt::Debug + Send + Sync {
+ // few open questions for later:
+ //
+ // - should scheduling policy type be a parameter
+ // as we see in the consistent hash, it does not work in
+ // pull based. Or we find another way to address this concern
+ // - should we add `ClusterState` as method parameter
+ //
+
+ /// User provided custom task distribution policy
+ ///
+ /// # Parameters
+ ///
+ /// * `slots` - vector of available executor slots, there may not be
available slots
+ /// * `running_jobs` - (JobId -> JobInfoCache) cache must contain only
running jobs
+ ///
+ /// # Returns
+ ///
+ /// vector of task, executor bounding
+ ///
+ async fn bind_tasks(
+ &self,
+ mut slots: Vec<&mut AvailableTaskSlots>,
+ running_jobs: Arc<HashMap<String, JobInfoCache>>,
+ ) -> datafusion::error::Result<Vec<BoundTask>>;
+
+ /// Name of [DistributionPolicy]
+ fn name(&self) -> &str;
+}
+
pub(crate) async fn bind_task_consistent_hash(
topology_nodes: HashMap<String, TopologyNode>,
num_replicas: usize,
tolerance: usize,
- active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ running_jobs: Arc<HashMap<String, JobInfoCache>>,
get_scan_files: GetScanFilesFunc,
) -> Result<(Vec<BoundTask>, Option<ConsistentHash<TopologyNode>>)> {
let mut total_slots = 0usize;
@@ -515,10 +548,13 @@ pub(crate) async fn bind_task_consistent_hash(
total_slots += node.available_slots as usize;
}
if total_slots == 0 {
- info!("Not enough available executor slots for binding tasks with
consistent hashing policy!!!");
+ debug!("Not enough available executor slots for binding tasks with
consistent hashing policy!!!");
return Ok((vec![], None));
}
- info!("Total slot number is {}", total_slots);
+ debug!(
+ "Total slot number for consistent hash binding is {}",
+ total_slots
+ );
let node_replicas = topology_nodes
.into_values()
@@ -528,7 +564,7 @@ pub(crate) async fn bind_task_consistent_hash(
ConsistentHash::new(node_replicas);
let mut schedulable_tasks: Vec<BoundTask> = vec![];
- for (job_id, job_info) in active_jobs.iter() {
+ for (job_id, job_info) in running_jobs.iter() {
if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
debug!(
"Job {} is not in running status and will be skipped",
@@ -544,7 +580,7 @@ pub(crate) async fn bind_task_consistent_hash(
{
let scan_files = get_scan_files(job_id,
running_stage.plan.clone())?;
if is_skip_consistent_hash(&scan_files) {
- info!(
+ debug!(
"Will skip stage {}/{} for consistent hashing task
binding",
job_id, running_stage.stage_id
);
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 2248e9b8..c9a034d2 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -18,6 +18,7 @@
//! Ballista scheduler specific configuration
+use crate::cluster::DistributionPolicy;
use crate::SessionBuilder;
use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
@@ -258,7 +259,7 @@ impl configure_me::parse_arg::ParseArgFromStr for
TaskDistribution {
}
}
-#[derive(Debug, Clone, Copy, Default)]
+#[derive(Debug, Clone, Default)]
pub enum TaskDistributionPolicy {
/// Eagerly assign tasks to executor slots. This will assign as many task
slots per executor
/// as are currently available
@@ -275,7 +276,10 @@ pub enum TaskDistributionPolicy {
num_replicas: usize,
tolerance: usize,
},
+ /// User provided task distribution policy
+ Custom(Arc<dyn DistributionPolicy>),
}
+
#[cfg(feature = "build-binary")]
impl TryFrom<Config> for SchedulerConfig {
type Error = ballista_core::error::BallistaError;
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 02c21a88..697d3047 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -109,18 +109,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
slots: num_free_slots,
}];
let available_slots = available_slots.iter_mut().collect();
- let active_jobs = self.state.task_manager.get_running_job_cache();
+ let running_jobs = self.state.task_manager.get_running_job_cache();
let schedulable_tasks = match self.state.config.task_distribution {
TaskDistributionPolicy::Bias => {
- bind_task_bias(available_slots, active_jobs, |_|
false).await
+ bind_task_bias(available_slots, running_jobs, |_|
false).await
}
TaskDistributionPolicy::RoundRobin => {
- bind_task_round_robin(available_slots, active_jobs, |_|
false).await
+ bind_task_round_robin(available_slots, running_jobs, |_|
false).await
}
TaskDistributionPolicy::ConsistentHash{..} => {
return Err(Status::unimplemented(
"ConsistentHash TaskDistribution is not feasible for
pull-based task scheduling"))
}
+
+ TaskDistributionPolicy::Custom(ref policy) =>{
+ policy.bind_tasks(available_slots,
running_jobs).await.map_err(|e| {
+ Status::internal(e.to_string())
+ })?
+ }
};
let mut tasks = vec![];
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index a24e0f3c..d937760d 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -44,14 +44,12 @@ use crate::display::print_stage_metrics;
use crate::planner::DistributedPlanner;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::timestamp_millis;
-use crate::state::execution_graph::execution_stage::RunningStage;
-pub(crate) use crate::state::execution_graph::execution_stage::{
+use crate::state::execution_stage::RunningStage;
+pub(crate) use crate::state::execution_stage::{
ExecutionStage, ResolvedStage, StageOutput, TaskInfo, UnresolvedStage,
};
use crate::state::task_manager::UpdatedStages;
-mod execution_stage;
-
/// Represents the DAG for a distributed query plan.
///
/// A distributed query plan consists of a set of stages which must be
executed sequentially.
@@ -932,7 +930,7 @@ impl ExecutionGraph {
Ok(next_task)
}
- pub(crate) fn fetch_running_stage(
+ pub fn fetch_running_stage(
&mut self,
black_list: &[usize],
) -> Option<(&mut RunningStage, &mut usize)> {
@@ -1333,7 +1331,7 @@ impl Debug for ExecutionGraph {
}
}
-pub(crate) fn create_task_info(executor_id: String, task_id: usize) ->
TaskInfo {
+pub fn create_task_info(executor_id: String, task_id: usize) -> TaskInfo {
TaskInfo {
task_id,
scheduled_time: SystemTime::now()
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/scheduler/src/state/execution_stage.rs
similarity index 88%
rename from ballista/scheduler/src/state/execution_graph/execution_stage.rs
rename to ballista/scheduler/src/state/execution_stage.rs
index 9d3a821b..1dd414f4 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_stage.rs
@@ -51,7 +51,7 @@ use crate::display::DisplayableBallistaExecutionPlan;
/// ↓
/// SuccessfulStage
#[derive(Clone)]
-pub(crate) enum ExecutionStage {
+pub enum ExecutionStage {
UnResolved(UnresolvedStage),
Resolved(ResolvedStage),
Running(RunningStage),
@@ -73,7 +73,7 @@ impl Debug for ExecutionStage {
impl ExecutionStage {
/// Get the name of the variant
- pub(crate) fn variant_name(&self) -> &str {
+ pub fn variant_name(&self) -> &str {
match self {
ExecutionStage::UnResolved(_) => "Unresolved",
ExecutionStage::Resolved(_) => "Resolved",
@@ -84,7 +84,7 @@ impl ExecutionStage {
}
/// Get the query plan for this query stage
- pub(crate) fn plan(&self) -> &dyn ExecutionPlan {
+ pub fn plan(&self) -> &dyn ExecutionPlan {
match self {
ExecutionStage::UnResolved(stage) => stage.plan.as_ref(),
ExecutionStage::Resolved(stage) => stage.plan.as_ref(),
@@ -97,47 +97,47 @@ impl ExecutionStage {
/// For a stage whose input stages are not all completed, we say it's a
unresolved stage
#[derive(Clone)]
-pub(crate) struct UnresolvedStage {
+pub struct UnresolvedStage {
/// Stage ID
- pub(crate) stage_id: usize,
+ pub stage_id: usize,
/// Stage Attempt number
- pub(crate) stage_attempt_num: usize,
+ pub stage_attempt_num: usize,
/// Stage ID of the stage that will take this stages outputs as inputs.
/// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
- pub(crate) output_links: Vec<usize>,
+ pub output_links: Vec<usize>,
/// Represents the outputs from this stage's child stages.
/// This stage can only be resolved an executed once all child stages are
completed.
- pub(crate) inputs: HashMap<usize, StageOutput>,
+ pub inputs: HashMap<usize, StageOutput>,
/// `ExecutionPlan` for this stage
- pub(crate) plan: Arc<dyn ExecutionPlan>,
+ pub plan: Arc<dyn ExecutionPlan>,
/// Record last attempt's failure reasons to avoid duplicate resubmits
- pub(crate) last_attempt_failure_reasons: HashSet<String>,
-
- pub(crate) session_config: Arc<SessionConfig>,
+ pub last_attempt_failure_reasons: HashSet<String>,
+ /// [SessionConfig] used for this stage
+ pub session_config: Arc<SessionConfig>,
}
/// For a stage, if it has no inputs or all of its input stages are completed,
/// then we call it as a resolved stage
#[derive(Clone)]
-pub(crate) struct ResolvedStage {
+pub struct ResolvedStage {
/// Stage ID
- pub(crate) stage_id: usize,
+ pub stage_id: usize,
/// Stage Attempt number
- pub(crate) stage_attempt_num: usize,
+ pub stage_attempt_num: usize,
/// Total number of partitions for this stage.
/// This stage will produce on task for partition.
- pub(crate) partitions: usize,
+ pub partitions: usize,
/// Stage ID of the stage that will take this stages outputs as inputs.
/// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
- pub(crate) output_links: Vec<usize>,
+ pub output_links: Vec<usize>,
/// Represents the outputs from this stage's child stages.
- pub(crate) inputs: HashMap<usize, StageOutput>,
+ pub inputs: HashMap<usize, StageOutput>,
/// `ExecutionPlan` for this stage
- pub(crate) plan: Arc<dyn ExecutionPlan>,
+ pub plan: Arc<dyn ExecutionPlan>,
/// Record last attempt's failure reasons to avoid duplicate resubmits
- pub(crate) last_attempt_failure_reasons: HashSet<String>,
-
- pub(crate) session_config: Arc<SessionConfig>,
+ pub last_attempt_failure_reasons: HashSet<String>,
+ /// [SessionConfig] used for this stage
+ pub session_config: Arc<SessionConfig>,
}
/// Different from the resolved stage, a running stage will
@@ -146,107 +146,108 @@ pub(crate) struct ResolvedStage {
/// 3. manage the stage-level combined metrics
/// Running stages will only be maintained in memory and will not saved to
the backend storage
#[derive(Clone)]
-pub(crate) struct RunningStage {
+pub struct RunningStage {
/// Stage ID
- pub(crate) stage_id: usize,
+ pub stage_id: usize,
/// Stage Attempt number
- pub(crate) stage_attempt_num: usize,
+ pub stage_attempt_num: usize,
+ /// Stage activation time (when was stage become running) in millis
+ pub stage_running_time: u128,
/// Total number of partitions for this stage.
/// This stage will produce on task for partition.
- pub(crate) partitions: usize,
+ pub partitions: usize,
/// Stage ID of the stage that will take this stages outputs as inputs.
/// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
- pub(crate) output_links: Vec<usize>,
+ pub output_links: Vec<usize>,
/// Represents the outputs from this stage's child stages.
- pub(crate) inputs: HashMap<usize, StageOutput>,
+ pub inputs: HashMap<usize, StageOutput>,
/// `ExecutionPlan` for this stage
- pub(crate) plan: Arc<dyn ExecutionPlan>,
+ pub plan: Arc<dyn ExecutionPlan>,
/// TaskInfo of each already scheduled task. If info is None, the
partition has not yet been scheduled.
/// The index of the Vec is the task's partition id
- pub(crate) task_infos: Vec<Option<TaskInfo>>,
+ pub task_infos: Vec<Option<TaskInfo>>,
/// Track the number of failures for each partition's task attempts.
/// The index of the Vec is the task's partition id.
- pub(crate) task_failure_numbers: Vec<usize>,
+ pub task_failure_numbers: Vec<usize>,
/// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
- pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
-
- pub(crate) session_config: Arc<SessionConfig>,
+ pub stage_metrics: Option<Vec<MetricsSet>>,
+ /// [SessionConfig] used for this stage
+ pub session_config: Arc<SessionConfig>,
}
/// If a stage finishes successfully, its task statuses and metrics will be
finalized
#[derive(Clone)]
-pub(crate) struct SuccessfulStage {
+pub struct SuccessfulStage {
/// Stage ID
- pub(crate) stage_id: usize,
+ pub stage_id: usize,
/// Stage Attempt number
- pub(crate) stage_attempt_num: usize,
+ pub stage_attempt_num: usize,
/// Total number of partitions for this stage.
/// This stage will produce on task for partition.
- pub(crate) partitions: usize,
+ pub partitions: usize,
/// Stage ID of the stage that will take this stages outputs as inputs.
/// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
- pub(crate) output_links: Vec<usize>,
+ pub output_links: Vec<usize>,
/// Represents the outputs from this stage's child stages.
- pub(crate) inputs: HashMap<usize, StageOutput>,
+ pub inputs: HashMap<usize, StageOutput>,
/// `ExecutionPlan` for this stage
- pub(crate) plan: Arc<dyn ExecutionPlan>,
+ pub plan: Arc<dyn ExecutionPlan>,
/// TaskInfo of each already successful task.
/// The index of the Vec is the task's partition id
- pub(crate) task_infos: Vec<TaskInfo>,
+ pub task_infos: Vec<TaskInfo>,
/// Combined metrics of the already finished tasks in the stage.
- pub(crate) stage_metrics: Vec<MetricsSet>,
-
- pub(crate) session_config: Arc<SessionConfig>,
+ pub stage_metrics: Vec<MetricsSet>,
+ /// [SessionConfig] used for this stage
+ pub session_config: Arc<SessionConfig>,
}
/// If a stage fails, it will be with an error message
#[derive(Clone)]
-pub(crate) struct FailedStage {
+pub struct FailedStage {
/// Stage ID
- pub(crate) stage_id: usize,
+ pub stage_id: usize,
/// Stage Attempt number
- pub(crate) stage_attempt_num: usize,
+ pub stage_attempt_num: usize,
/// Total number of partitions for this stage.
/// This stage will produce on task for partition.
- pub(crate) partitions: usize,
+ pub partitions: usize,
/// Stage ID of the stage that will take this stages outputs as inputs.
/// If `output_links` is empty then this the final stage in the
`ExecutionGraph`
#[allow(dead_code)] // not used at the moment, will be used later
- pub(crate) output_links: Vec<usize>,
+ pub output_links: Vec<usize>,
/// `ExecutionPlan` for this stage
- pub(crate) plan: Arc<dyn ExecutionPlan>,
+ pub plan: Arc<dyn ExecutionPlan>,
/// TaskInfo of each already scheduled tasks. If info is None, the
partition has not yet been scheduled
/// The index of the Vec is the task's partition id
- pub(crate) task_infos: Vec<Option<TaskInfo>>,
+ pub task_infos: Vec<Option<TaskInfo>>,
/// Combined metrics of the already finished tasks in the stage, If it is
None, no task is finished yet.
#[allow(dead_code)] // not used at the moment, will be used later
- pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
+ pub stage_metrics: Option<Vec<MetricsSet>>,
/// Error message
- pub(crate) error_message: String,
+ pub error_message: String,
}
#[derive(Clone)]
#[allow(dead_code)] // we may use the fields later
-pub(crate) struct TaskInfo {
+pub struct TaskInfo {
/// Task ID
- pub(super) task_id: usize,
+ pub task_id: usize,
/// Task scheduled time
- pub(super) scheduled_time: u128,
+ pub scheduled_time: u128,
/// Task launch time
- pub(super) launch_time: u128,
+ pub launch_time: u128,
/// Start execution time
- pub(super) start_exec_time: u128,
+ pub start_exec_time: u128,
/// Finish execution time
- pub(super) end_exec_time: u128,
+ pub end_exec_time: u128,
/// Task finish time
- pub(super) finish_time: u128,
+ pub finish_time: u128,
/// Task Status
- pub(super) task_status: task_status::Status,
- //pub(crate) session_config: Arc<SessionConfig>,
+ pub task_status: task_status::Status,
}
impl UnresolvedStage {
- pub(super) fn new(
+ pub fn new(
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
output_links: Vec<usize>,
@@ -269,7 +270,7 @@ impl UnresolvedStage {
}
}
- pub(super) fn new_with_inputs(
+ pub fn new_with_inputs(
stage_id: usize,
stage_attempt_num: usize,
plan: Arc<dyn ExecutionPlan>,
@@ -290,7 +291,7 @@ impl UnresolvedStage {
}
/// Add input partitions published from an input stage.
- pub(super) fn add_input_partitions(
+ pub fn add_input_partitions(
&mut self,
stage_id: usize,
locations: Vec<PartitionLocation>,
@@ -308,7 +309,7 @@ impl UnresolvedStage {
/// Remove input partitions from an input stage on a given executor.
/// Return the HashSet of removed map partition ids
- pub(super) fn remove_input_partitions(
+ pub fn remove_input_partitions(
&mut self,
input_stage_id: usize,
_input_partition_id: usize,
@@ -336,7 +337,7 @@ impl UnresolvedStage {
}
/// Marks the input stage ID as complete.
- pub(super) fn complete_input(&mut self, stage_id: usize) {
+ pub fn complete_input(&mut self, stage_id: usize) {
if let Some(input) = self.inputs.get_mut(&stage_id) {
input.complete = true;
}
@@ -344,12 +345,12 @@ impl UnresolvedStage {
/// Returns true if all inputs are complete and we can resolve all
/// UnresolvedShuffleExec operators to ShuffleReadExec
- pub(super) fn resolvable(&self) -> bool {
+ pub fn resolvable(&self) -> bool {
self.inputs.iter().all(|(_, input)| input.is_complete())
}
/// Change to the resolved state
- pub(super) fn to_resolved(&self) -> Result<ResolvedStage> {
+ pub fn to_resolved(&self) -> Result<ResolvedStage> {
let input_locations = self
.inputs
.iter()
@@ -397,7 +398,7 @@ impl Debug for UnresolvedStage {
}
impl ResolvedStage {
- pub(super) fn new(
+ pub fn new(
stage_id: usize,
stage_attempt_num: usize,
plan: Arc<dyn ExecutionPlan>,
@@ -421,7 +422,7 @@ impl ResolvedStage {
}
/// Change to the running state
- pub(super) fn to_running(&self) -> RunningStage {
+ pub fn to_running(&self) -> RunningStage {
RunningStage::new(
self.stage_id,
self.stage_attempt_num,
@@ -434,7 +435,7 @@ impl ResolvedStage {
}
/// Change to the unresolved state
- pub(super) fn to_unresolved(&self) -> Result<UnresolvedStage> {
+ pub fn to_unresolved(&self) -> Result<UnresolvedStage> {
let new_plan =
crate::planner::rollback_resolved_shuffles(self.plan.clone())?;
let unresolved = UnresolvedStage::new_with_inputs(
@@ -463,7 +464,7 @@ impl Debug for ResolvedStage {
}
impl RunningStage {
- pub(super) fn new(
+ pub fn new(
stage_id: usize,
stage_attempt_num: usize,
plan: Arc<dyn ExecutionPlan>,
@@ -475,6 +476,10 @@ impl RunningStage {
Self {
stage_id,
stage_attempt_num,
+ stage_running_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
partitions,
output_links,
inputs,
@@ -486,7 +491,7 @@ impl RunningStage {
}
}
- pub(super) fn to_successful(&self) -> SuccessfulStage {
+ pub fn to_successful(&self) -> SuccessfulStage {
let task_infos = self
.task_infos
.iter()
@@ -517,7 +522,7 @@ impl RunningStage {
}
}
- pub(super) fn to_failed(&self, error_message: String) -> FailedStage {
+ pub fn to_failed(&self, error_message: String) -> FailedStage {
FailedStage {
stage_id: self.stage_id,
stage_attempt_num: self.stage_attempt_num,
@@ -531,7 +536,7 @@ impl RunningStage {
}
/// Change to the unresolved state and bump the stage attempt number
- pub(super) fn to_unresolved(
+ pub fn to_unresolved(
&self,
failure_reasons: HashSet<String>,
) -> Result<UnresolvedStage> {
@@ -550,7 +555,7 @@ impl RunningStage {
}
/// Returns `true` if all tasks for this stage are successful
- pub(super) fn is_successful(&self) -> bool {
+ pub fn is_successful(&self) -> bool {
self.task_infos.iter().all(|info| {
matches!(
info,
@@ -563,7 +568,7 @@ impl RunningStage {
}
/// Returns the number of successful tasks
- pub(super) fn successful_tasks(&self) -> usize {
+ pub fn successful_tasks(&self) -> usize {
self.task_infos
.iter()
.filter(|info| {
@@ -579,12 +584,12 @@ impl RunningStage {
}
/// Returns the number of scheduled tasks
- pub(super) fn scheduled_tasks(&self) -> usize {
+ pub fn scheduled_tasks(&self) -> usize {
self.task_infos.iter().filter(|s| s.is_some()).count()
}
/// Returns a vector of currently running tasks in this stage
- pub(super) fn running_tasks(&self) -> Vec<(usize, usize, usize, String)> {
+ pub fn running_tasks(&self) -> Vec<(usize, usize, usize, String)> {
self.task_infos
.iter()
.enumerate()
@@ -601,16 +606,12 @@ impl RunningStage {
/// Returns the number of tasks in this stage which are available for
scheduling.
/// If the stage is not yet resolved, then this will return `0`, otherwise
it will
/// return the number of tasks where the task info is not yet set.
- pub(super) fn available_tasks(&self) -> usize {
+ pub fn available_tasks(&self) -> usize {
self.task_infos.iter().filter(|s| s.is_none()).count()
}
/// Update the TaskInfo for task partition
- pub(super) fn update_task_info(
- &mut self,
- partition_id: usize,
- status: TaskStatus,
- ) -> bool {
+ pub fn update_task_info(&mut self, partition_id: usize, status:
TaskStatus) -> bool {
debug!("Updating TaskInfo for partition {}", partition_id);
let task_info = self.task_infos[partition_id].as_ref().unwrap();
let task_id = task_info.task_id;
@@ -647,7 +648,7 @@ impl RunningStage {
}
/// update and combine the task metrics to the stage metrics
- pub(super) fn update_task_metrics(
+ pub fn update_task_metrics(
&mut self,
partition: usize,
metrics: Vec<OperatorMetricsSet>,
@@ -690,7 +691,7 @@ impl RunningStage {
Ok(())
}
- pub(super) fn combine_metrics_set(
+ pub fn combine_metrics_set(
first: &mut MetricsSet,
second: Vec<MetricValue>,
partition: usize,
@@ -703,7 +704,7 @@ impl RunningStage {
first.aggregate_by_name()
}
- pub(super) fn task_failure_number(&self, partition_id: usize) -> usize {
+ pub fn task_failure_number(&self, partition_id: usize) -> usize {
self.task_failure_numbers[partition_id]
}
@@ -745,7 +746,7 @@ impl RunningStage {
/// Remove input partitions from an input stage on a given executor.
/// Return the HashSet of removed map partition ids
- pub(super) fn remove_input_partitions(
+ pub fn remove_input_partitions(
&mut self,
input_stage_id: usize,
_input_partition_id: usize,
@@ -821,6 +822,10 @@ impl SuccessfulStage {
task_failure_numbers: vec![0; self.partitions],
stage_metrics,
session_config: self.session_config.clone(),
+ stage_running_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
}
}
@@ -881,7 +886,7 @@ impl Debug for SuccessfulStage {
impl FailedStage {
/// Returns the number of successful tasks
- pub(super) fn successful_tasks(&self) -> usize {
+ pub fn successful_tasks(&self) -> usize {
self.task_infos
.iter()
.filter(|info| {
@@ -896,14 +901,14 @@ impl FailedStage {
.count()
}
/// Returns the number of scheduled tasks
- pub(super) fn scheduled_tasks(&self) -> usize {
+ pub fn scheduled_tasks(&self) -> usize {
self.task_infos.iter().filter(|s| s.is_some()).count()
}
/// Returns the number of tasks in this stage which are available for
scheduling.
/// If the stage is not yet resolved, then this will return `0`, otherwise
it will
/// return the number of tasks where the task status is not yet set.
- pub(super) fn available_tasks(&self) -> usize {
+ pub fn available_tasks(&self) -> usize {
self.task_infos.iter().filter(|s| s.is_none()).count()
}
}
@@ -942,7 +947,7 @@ fn get_stage_partitions(plan: Arc<dyn ExecutionPlan>) ->
usize {
/// When all tasks for the child stage are complete, it will mark the
`StageOutput`
/// as complete.
#[derive(Clone, Debug, Default)]
-pub(crate) struct StageOutput {
+pub struct StageOutput {
/// Map from partition -> partition locations
pub partition_locations: HashMap<usize, Vec<PartitionLocation>>,
/// Flag indicating whether all tasks are complete
@@ -950,7 +955,7 @@ pub(crate) struct StageOutput {
}
impl StageOutput {
- pub(super) fn new() -> Self {
+ pub fn new() -> Self {
Self {
partition_locations: HashMap::new(),
complete: false,
@@ -958,7 +963,7 @@ impl StageOutput {
}
/// Add a `PartitionLocation` to the `StageOutput`
- pub(super) fn add_partition(&mut self, partition_location:
PartitionLocation) {
+ pub fn add_partition(&mut self, partition_location: PartitionLocation) {
if let Some(parts) = self
.partition_locations
.get_mut(&partition_location.partition_id.partition_id)
@@ -972,7 +977,7 @@ impl StageOutput {
}
}
- pub(super) fn is_complete(&self) -> bool {
+ pub fn is_complete(&self) -> bool {
self.complete
}
}
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index ba49149c..9c4c5acb 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -66,26 +66,25 @@ impl ExecutorManager {
Ok(())
}
- /// Bind the ready to running tasks from [`active_jobs`] with available
executors.
///
- /// If `executors` is provided, only bind slots from the specified
executor IDs
+ /// Bind the ready to run tasks from [`active_jobs`] to available
executors.
pub async fn bind_schedulable_tasks(
&self,
- active_jobs: Arc<HashMap<String, JobInfoCache>>,
+ running_jobs: Arc<HashMap<String, JobInfoCache>>,
) -> Result<Vec<BoundTask>> {
- if active_jobs.is_empty() {
- warn!("There's no active jobs for binding tasks");
+ if running_jobs.is_empty() {
+ debug!("There's no active jobs for binding tasks");
return Ok(vec![]);
}
let alive_executors = self.get_alive_executors();
if alive_executors.is_empty() {
- warn!("There's no alive executors for binding tasks");
+ debug!("There's no alive executors for binding tasks");
return Ok(vec![]);
}
self.cluster_state
.bind_schedulable_tasks(
- self.config.task_distribution,
- active_jobs,
+ self.config.task_distribution.clone(),
+ running_jobs,
Some(alive_executors),
)
.await
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 4394dc00..c628aaf2 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -49,6 +49,7 @@ use prost::Message;
pub mod execution_graph;
pub mod execution_graph_dot;
+pub mod execution_stage;
pub mod executor_manager;
pub mod session_manager;
pub mod task_manager;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]