This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 74a8cfc0 feat: add custom task scheduling policy & make a lot of 
methods public (#1243)
74a8cfc0 is described below

commit 74a8cfc0c55b3b706611efe42bba5aa3980e587f
Author: Marko Milenković <[email protected]>
AuthorDate: Fri Apr 18 16:13:14 2025 +0100

    feat: add custom task scheduling policy & make a lot of methods public 
(#1243)
---
 ballista/scheduler/src/cluster/memory.rs           |   3 +
 ballista/scheduler/src/cluster/mod.rs              |  66 +++++--
 ballista/scheduler/src/config.rs                   |   6 +-
 ballista/scheduler/src/scheduler_server/grpc.rs    |  12 +-
 ballista/scheduler/src/state/execution_graph.rs    |  10 +-
 .../state/{execution_graph => }/execution_stage.rs | 201 +++++++++++----------
 ballista/scheduler/src/state/executor_manager.rs   |  15 +-
 ballista/scheduler/src/state/mod.rs                |   1 +
 8 files changed, 183 insertions(+), 131 deletions(-)

diff --git a/ballista/scheduler/src/cluster/memory.rs 
b/ballista/scheduler/src/cluster/memory.rs
index 918a0b68..ecdf13d5 100644
--- a/ballista/scheduler/src/cluster/memory.rs
+++ b/ballista/scheduler/src/cluster/memory.rs
@@ -168,6 +168,9 @@ impl ClusterState for InMemoryClusterState {
                 }
                 bound_tasks
             }
+            TaskDistributionPolicy::Custom(ref policy) => {
+                policy.bind_tasks(available_slots, active_jobs).await?
+            }
         };
 
         Ok(bound_tasks)
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 381bcdb4..6ca8884f 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -29,7 +29,7 @@ use datafusion::error::DataFusionError;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::{SessionConfig, SessionContext};
 use futures::Stream;
-use log::{debug, info, warn};
+use log::debug;
 
 use ballista_core::config::BallistaConfig;
 use ballista_core::consistent_hash::ConsistentHash;
@@ -319,14 +319,14 @@ pub trait JobState: Send + Sync {
 
 pub(crate) async fn bind_task_bias(
     mut slots: Vec<&mut AvailableTaskSlots>,
-    active_jobs: Arc<HashMap<String, JobInfoCache>>,
+    running_jobs: Arc<HashMap<String, JobInfoCache>>,
     if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
 ) -> Vec<BoundTask> {
     let mut schedulable_tasks: Vec<BoundTask> = vec![];
 
     let total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
     if total_slots == 0 {
-        warn!("Not enough available executor slots for task running!!!");
+        debug!("Not enough available executor slots for task running!!!");
         return schedulable_tasks;
     }
 
@@ -335,7 +335,7 @@ pub(crate) async fn bind_task_bias(
 
     let mut idx_slot = 0usize;
     let mut slot = &mut slots[idx_slot];
-    for (job_id, job_info) in active_jobs.iter() {
+    for (job_id, job_info) in running_jobs.iter() {
         if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
             debug!(
                 "Job {} is not in running status and will be skipped",
@@ -350,7 +350,7 @@ pub(crate) async fn bind_task_bias(
             graph.fetch_running_stage(&black_list)
         {
             if if_skip(running_stage.plan.clone()) {
-                info!(
+                debug!(
                     "Will skip stage {}/{} for bias task binding",
                     job_id, running_stage.stage_id
                 );
@@ -406,23 +406,23 @@ pub(crate) async fn bind_task_bias(
 
 pub(crate) async fn bind_task_round_robin(
     mut slots: Vec<&mut AvailableTaskSlots>,
-    active_jobs: Arc<HashMap<String, JobInfoCache>>,
+    running_jobs: Arc<HashMap<String, JobInfoCache>>,
     if_skip: fn(Arc<dyn ExecutionPlan>) -> bool,
 ) -> Vec<BoundTask> {
     let mut schedulable_tasks: Vec<BoundTask> = vec![];
 
     let mut total_slots = slots.iter().fold(0, |acc, s| acc + s.slots);
     if total_slots == 0 {
-        warn!("Not enough available executor slots for task running!!!");
+        debug!("Not enough available executor slots for task running!!!");
         return schedulable_tasks;
     }
-    info!("Total slot number is {}", total_slots);
+    debug!("Total slot number is {}", total_slots);
 
     // Sort the slots by descending order
     slots.sort_by(|a, b| Ord::cmp(&b.slots, &a.slots));
 
     let mut idx_slot = 0usize;
-    for (job_id, job_info) in active_jobs.iter() {
+    for (job_id, job_info) in running_jobs.iter() {
         if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
             debug!(
                 "Job {} is not in running status and will be skipped",
@@ -437,7 +437,7 @@ pub(crate) async fn bind_task_round_robin(
             graph.fetch_running_stage(&black_list)
         {
             if if_skip(running_stage.plan.clone()) {
-                info!(
+                debug!(
                     "Will skip stage {}/{} for round robin task binding",
                     job_id, running_stage.stage_id
                 );
@@ -498,16 +498,49 @@ pub(crate) async fn bind_task_round_robin(
     schedulable_tasks
 }
 
+/// Maps execution plan to list of files it scans
 type GetScanFilesFunc = fn(
     &str,
     Arc<dyn ExecutionPlan>,
 ) -> datafusion::common::Result<Vec<Vec<Vec<PartitionedFile>>>>;
 
+/// User provided task distribution policy
+#[async_trait::async_trait]
+pub trait DistributionPolicy: std::fmt::Debug + Send + Sync {
+    // few open questions for later:
+    //
+    // - should scheduling policy type be a parameter
+    //   as we see in the consistent hash, it does not work in
+    //   pull based. Or we find another way to address this concern
+    // - should we add `ClusterState` as method parameter
+    //
+
+    /// User provided custom task distribution policy
+    ///
+    /// # Parameters
+    ///
+    /// * `slots` - vector of available executor slots, there may not be 
available slots
+    /// * `running_jobs` - (JobId -> JobInfoCache) cache must contain only 
running jobs
+    ///
+    /// # Returns
+    ///
+    /// vector of task, executor bounding
+    ///
+    async fn bind_tasks(
+        &self,
+        mut slots: Vec<&mut AvailableTaskSlots>,
+        running_jobs: Arc<HashMap<String, JobInfoCache>>,
+    ) -> datafusion::error::Result<Vec<BoundTask>>;
+
+    /// Name of [DistributionPolicy]
+    fn name(&self) -> &str;
+}
+
 pub(crate) async fn bind_task_consistent_hash(
     topology_nodes: HashMap<String, TopologyNode>,
     num_replicas: usize,
     tolerance: usize,
-    active_jobs: Arc<HashMap<String, JobInfoCache>>,
+    running_jobs: Arc<HashMap<String, JobInfoCache>>,
     get_scan_files: GetScanFilesFunc,
 ) -> Result<(Vec<BoundTask>, Option<ConsistentHash<TopologyNode>>)> {
     let mut total_slots = 0usize;
@@ -515,10 +548,13 @@ pub(crate) async fn bind_task_consistent_hash(
         total_slots += node.available_slots as usize;
     }
     if total_slots == 0 {
-        info!("Not enough available executor slots for binding tasks with 
consistent hashing policy!!!");
+        debug!("Not enough available executor slots for binding tasks with 
consistent hashing policy!!!");
         return Ok((vec![], None));
     }
-    info!("Total slot number is {}", total_slots);
+    debug!(
+        "Total slot number for consistent hash binding is {}",
+        total_slots
+    );
 
     let node_replicas = topology_nodes
         .into_values()
@@ -528,7 +564,7 @@ pub(crate) async fn bind_task_consistent_hash(
         ConsistentHash::new(node_replicas);
 
     let mut schedulable_tasks: Vec<BoundTask> = vec![];
-    for (job_id, job_info) in active_jobs.iter() {
+    for (job_id, job_info) in running_jobs.iter() {
         if !matches!(job_info.status, Some(job_status::Status::Running(_))) {
             debug!(
                 "Job {} is not in running status and will be skipped",
@@ -544,7 +580,7 @@ pub(crate) async fn bind_task_consistent_hash(
         {
             let scan_files = get_scan_files(job_id, 
running_stage.plan.clone())?;
             if is_skip_consistent_hash(&scan_files) {
-                info!(
+                debug!(
                     "Will skip stage {}/{} for consistent hashing task 
binding",
                     job_id, running_stage.stage_id
                 );
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 2248e9b8..c9a034d2 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -18,6 +18,7 @@
 
 //! Ballista scheduler specific configuration
 
+use crate::cluster::DistributionPolicy;
 use crate::SessionBuilder;
 use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer};
 use datafusion_proto::logical_plan::LogicalExtensionCodec;
@@ -258,7 +259,7 @@ impl configure_me::parse_arg::ParseArgFromStr for 
TaskDistribution {
     }
 }
 
-#[derive(Debug, Clone, Copy, Default)]
+#[derive(Debug, Clone, Default)]
 pub enum TaskDistributionPolicy {
     /// Eagerly assign tasks to executor slots. This will assign as many task 
slots per executor
     /// as are currently available
@@ -275,7 +276,10 @@ pub enum TaskDistributionPolicy {
         num_replicas: usize,
         tolerance: usize,
     },
+    /// User provided task distribution policy
+    Custom(Arc<dyn DistributionPolicy>),
 }
+
 #[cfg(feature = "build-binary")]
 impl TryFrom<Config> for SchedulerConfig {
     type Error = ballista_core::error::BallistaError;
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 02c21a88..697d3047 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -109,18 +109,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 slots: num_free_slots,
             }];
             let available_slots = available_slots.iter_mut().collect();
-            let active_jobs = self.state.task_manager.get_running_job_cache();
+            let running_jobs = self.state.task_manager.get_running_job_cache();
             let schedulable_tasks = match self.state.config.task_distribution {
                 TaskDistributionPolicy::Bias => {
-                    bind_task_bias(available_slots, active_jobs, |_| 
false).await
+                    bind_task_bias(available_slots, running_jobs, |_| 
false).await
                 }
                 TaskDistributionPolicy::RoundRobin => {
-                    bind_task_round_robin(available_slots, active_jobs, |_| 
false).await
+                    bind_task_round_robin(available_slots, running_jobs, |_| 
false).await
                 }
                 TaskDistributionPolicy::ConsistentHash{..} => {
                     return Err(Status::unimplemented(
                         "ConsistentHash TaskDistribution is not feasible for 
pull-based task scheduling"))
                 }
+
+                TaskDistributionPolicy::Custom(ref policy) =>{
+                    policy.bind_tasks(available_slots, 
running_jobs).await.map_err(|e| {
+                        Status::internal(e.to_string())
+                    })?
+                }
             };
 
             let mut tasks = vec![];
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index a24e0f3c..d937760d 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -44,14 +44,12 @@ use crate::display::print_stage_metrics;
 use crate::planner::DistributedPlanner;
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::timestamp_millis;
-use crate::state::execution_graph::execution_stage::RunningStage;
-pub(crate) use crate::state::execution_graph::execution_stage::{
+use crate::state::execution_stage::RunningStage;
+pub(crate) use crate::state::execution_stage::{
     ExecutionStage, ResolvedStage, StageOutput, TaskInfo, UnresolvedStage,
 };
 use crate::state::task_manager::UpdatedStages;
 
-mod execution_stage;
-
 /// Represents the DAG for a distributed query plan.
 ///
 /// A distributed query plan consists of a set of stages which must be 
executed sequentially.
@@ -932,7 +930,7 @@ impl ExecutionGraph {
         Ok(next_task)
     }
 
-    pub(crate) fn fetch_running_stage(
+    pub fn fetch_running_stage(
         &mut self,
         black_list: &[usize],
     ) -> Option<(&mut RunningStage, &mut usize)> {
@@ -1333,7 +1331,7 @@ impl Debug for ExecutionGraph {
     }
 }
 
-pub(crate) fn create_task_info(executor_id: String, task_id: usize) -> 
TaskInfo {
+pub fn create_task_info(executor_id: String, task_id: usize) -> TaskInfo {
     TaskInfo {
         task_id,
         scheduled_time: SystemTime::now()
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_stage.rs
similarity index 88%
rename from ballista/scheduler/src/state/execution_graph/execution_stage.rs
rename to ballista/scheduler/src/state/execution_stage.rs
index 9d3a821b..1dd414f4 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_stage.rs
@@ -51,7 +51,7 @@ use crate::display::DisplayableBallistaExecutionPlan;
 ///                                ↓
 ///                         SuccessfulStage
 #[derive(Clone)]
-pub(crate) enum ExecutionStage {
+pub enum ExecutionStage {
     UnResolved(UnresolvedStage),
     Resolved(ResolvedStage),
     Running(RunningStage),
@@ -73,7 +73,7 @@ impl Debug for ExecutionStage {
 
 impl ExecutionStage {
     /// Get the name of the variant
-    pub(crate) fn variant_name(&self) -> &str {
+    pub fn variant_name(&self) -> &str {
         match self {
             ExecutionStage::UnResolved(_) => "Unresolved",
             ExecutionStage::Resolved(_) => "Resolved",
@@ -84,7 +84,7 @@ impl ExecutionStage {
     }
 
     /// Get the query plan for this query stage
-    pub(crate) fn plan(&self) -> &dyn ExecutionPlan {
+    pub fn plan(&self) -> &dyn ExecutionPlan {
         match self {
             ExecutionStage::UnResolved(stage) => stage.plan.as_ref(),
             ExecutionStage::Resolved(stage) => stage.plan.as_ref(),
@@ -97,47 +97,47 @@ impl ExecutionStage {
 
 /// For a stage whose input stages are not all completed, we say it's a 
unresolved stage
 #[derive(Clone)]
-pub(crate) struct UnresolvedStage {
+pub struct UnresolvedStage {
     /// Stage ID
-    pub(crate) stage_id: usize,
+    pub stage_id: usize,
     /// Stage Attempt number
-    pub(crate) stage_attempt_num: usize,
+    pub stage_attempt_num: usize,
     /// Stage ID of the stage that will take this stages outputs as inputs.
     /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
-    pub(crate) output_links: Vec<usize>,
+    pub output_links: Vec<usize>,
     /// Represents the outputs from this stage's child stages.
     /// This stage can only be resolved an executed once all child stages are 
completed.
-    pub(crate) inputs: HashMap<usize, StageOutput>,
+    pub inputs: HashMap<usize, StageOutput>,
     /// `ExecutionPlan` for this stage
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub plan: Arc<dyn ExecutionPlan>,
     /// Record last attempt's failure reasons to avoid duplicate resubmits
-    pub(crate) last_attempt_failure_reasons: HashSet<String>,
-
-    pub(crate) session_config: Arc<SessionConfig>,
+    pub last_attempt_failure_reasons: HashSet<String>,
+    /// [SessionConfig] used for this stage
+    pub session_config: Arc<SessionConfig>,
 }
 
 /// For a stage, if it has no inputs or all of its input stages are completed,
 /// then we call it as a resolved stage
 #[derive(Clone)]
-pub(crate) struct ResolvedStage {
+pub struct ResolvedStage {
     /// Stage ID
-    pub(crate) stage_id: usize,
+    pub stage_id: usize,
     /// Stage Attempt number
-    pub(crate) stage_attempt_num: usize,
+    pub stage_attempt_num: usize,
     /// Total number of partitions for this stage.
     /// This stage will produce on task for partition.
-    pub(crate) partitions: usize,
+    pub partitions: usize,
     /// Stage ID of the stage that will take this stages outputs as inputs.
     /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
-    pub(crate) output_links: Vec<usize>,
+    pub output_links: Vec<usize>,
     /// Represents the outputs from this stage's child stages.
-    pub(crate) inputs: HashMap<usize, StageOutput>,
+    pub inputs: HashMap<usize, StageOutput>,
     /// `ExecutionPlan` for this stage
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub plan: Arc<dyn ExecutionPlan>,
     /// Record last attempt's failure reasons to avoid duplicate resubmits
-    pub(crate) last_attempt_failure_reasons: HashSet<String>,
-
-    pub(crate) session_config: Arc<SessionConfig>,
+    pub last_attempt_failure_reasons: HashSet<String>,
+    /// [SessionConfig] used for this stage
+    pub session_config: Arc<SessionConfig>,
 }
 
 /// Different from the resolved stage, a running stage will
@@ -146,107 +146,108 @@ pub(crate) struct ResolvedStage {
 /// 3. manage the stage-level combined metrics
 ///    Running stages will only be maintained in memory and will not saved to 
the backend storage
 #[derive(Clone)]
-pub(crate) struct RunningStage {
+pub struct RunningStage {
     /// Stage ID
-    pub(crate) stage_id: usize,
+    pub stage_id: usize,
     /// Stage Attempt number
-    pub(crate) stage_attempt_num: usize,
+    pub stage_attempt_num: usize,
+    /// Stage activation time (when was stage become running) in millis
+    pub stage_running_time: u128,
     /// Total number of partitions for this stage.
     /// This stage will produce on task for partition.
-    pub(crate) partitions: usize,
+    pub partitions: usize,
     /// Stage ID of the stage that will take this stages outputs as inputs.
     /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
-    pub(crate) output_links: Vec<usize>,
+    pub output_links: Vec<usize>,
     /// Represents the outputs from this stage's child stages.
-    pub(crate) inputs: HashMap<usize, StageOutput>,
+    pub inputs: HashMap<usize, StageOutput>,
     /// `ExecutionPlan` for this stage
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub plan: Arc<dyn ExecutionPlan>,
     /// TaskInfo of each already scheduled task. If info is None, the 
partition has not yet been scheduled.
     /// The index of the Vec is the task's partition id
-    pub(crate) task_infos: Vec<Option<TaskInfo>>,
+    pub task_infos: Vec<Option<TaskInfo>>,
     /// Track the number of failures for each partition's task attempts.
     /// The index of the Vec is the task's partition id.
-    pub(crate) task_failure_numbers: Vec<usize>,
+    pub task_failure_numbers: Vec<usize>,
     /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
-    pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
-
-    pub(crate) session_config: Arc<SessionConfig>,
+    pub stage_metrics: Option<Vec<MetricsSet>>,
+    /// [SessionConfig] used for this stage
+    pub session_config: Arc<SessionConfig>,
 }
 
 /// If a stage finishes successfully, its task statuses and metrics will be 
finalized
 #[derive(Clone)]
-pub(crate) struct SuccessfulStage {
+pub struct SuccessfulStage {
     /// Stage ID
-    pub(crate) stage_id: usize,
+    pub stage_id: usize,
     /// Stage Attempt number
-    pub(crate) stage_attempt_num: usize,
+    pub stage_attempt_num: usize,
     /// Total number of partitions for this stage.
     /// This stage will produce on task for partition.
-    pub(crate) partitions: usize,
+    pub partitions: usize,
     /// Stage ID of the stage that will take this stages outputs as inputs.
     /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
-    pub(crate) output_links: Vec<usize>,
+    pub output_links: Vec<usize>,
     /// Represents the outputs from this stage's child stages.
-    pub(crate) inputs: HashMap<usize, StageOutput>,
+    pub inputs: HashMap<usize, StageOutput>,
     /// `ExecutionPlan` for this stage
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub plan: Arc<dyn ExecutionPlan>,
     /// TaskInfo of each already successful task.
     /// The index of the Vec is the task's partition id
-    pub(crate) task_infos: Vec<TaskInfo>,
+    pub task_infos: Vec<TaskInfo>,
     /// Combined metrics of the already finished tasks in the stage.
-    pub(crate) stage_metrics: Vec<MetricsSet>,
-
-    pub(crate) session_config: Arc<SessionConfig>,
+    pub stage_metrics: Vec<MetricsSet>,
+    /// [SessionConfig] used for this stage
+    pub session_config: Arc<SessionConfig>,
 }
 
 /// If a stage fails, it will be with an error message
 #[derive(Clone)]
-pub(crate) struct FailedStage {
+pub struct FailedStage {
     /// Stage ID
-    pub(crate) stage_id: usize,
+    pub stage_id: usize,
     /// Stage Attempt number
-    pub(crate) stage_attempt_num: usize,
+    pub stage_attempt_num: usize,
     /// Total number of partitions for this stage.
     /// This stage will produce on task for partition.
-    pub(crate) partitions: usize,
+    pub partitions: usize,
     /// Stage ID of the stage that will take this stages outputs as inputs.
     /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
     #[allow(dead_code)] // not used at the moment, will be used later
-    pub(crate) output_links: Vec<usize>,
+    pub output_links: Vec<usize>,
     /// `ExecutionPlan` for this stage
-    pub(crate) plan: Arc<dyn ExecutionPlan>,
+    pub plan: Arc<dyn ExecutionPlan>,
     /// TaskInfo of each already scheduled tasks. If info is None, the 
partition has not yet been scheduled
     /// The index of the Vec is the task's partition id
-    pub(crate) task_infos: Vec<Option<TaskInfo>>,
+    pub task_infos: Vec<Option<TaskInfo>>,
     /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
     #[allow(dead_code)] // not used at the moment, will be used later
-    pub(crate) stage_metrics: Option<Vec<MetricsSet>>,
+    pub stage_metrics: Option<Vec<MetricsSet>>,
     /// Error message
-    pub(crate) error_message: String,
+    pub error_message: String,
 }
 
 #[derive(Clone)]
 #[allow(dead_code)] // we may use the fields later
-pub(crate) struct TaskInfo {
+pub struct TaskInfo {
     /// Task ID
-    pub(super) task_id: usize,
+    pub task_id: usize,
     /// Task scheduled time
-    pub(super) scheduled_time: u128,
+    pub scheduled_time: u128,
     /// Task launch time
-    pub(super) launch_time: u128,
+    pub launch_time: u128,
     /// Start execution time
-    pub(super) start_exec_time: u128,
+    pub start_exec_time: u128,
     /// Finish execution time
-    pub(super) end_exec_time: u128,
+    pub end_exec_time: u128,
     /// Task finish time
-    pub(super) finish_time: u128,
+    pub finish_time: u128,
     /// Task Status
-    pub(super) task_status: task_status::Status,
-    //pub(crate) session_config: Arc<SessionConfig>,
+    pub task_status: task_status::Status,
 }
 
 impl UnresolvedStage {
-    pub(super) fn new(
+    pub fn new(
         stage_id: usize,
         plan: Arc<dyn ExecutionPlan>,
         output_links: Vec<usize>,
@@ -269,7 +270,7 @@ impl UnresolvedStage {
         }
     }
 
-    pub(super) fn new_with_inputs(
+    pub fn new_with_inputs(
         stage_id: usize,
         stage_attempt_num: usize,
         plan: Arc<dyn ExecutionPlan>,
@@ -290,7 +291,7 @@ impl UnresolvedStage {
     }
 
     /// Add input partitions published from an input stage.
-    pub(super) fn add_input_partitions(
+    pub fn add_input_partitions(
         &mut self,
         stage_id: usize,
         locations: Vec<PartitionLocation>,
@@ -308,7 +309,7 @@ impl UnresolvedStage {
 
     /// Remove input partitions from an input stage on a given executor.
     /// Return the HashSet of removed map partition ids
-    pub(super) fn remove_input_partitions(
+    pub fn remove_input_partitions(
         &mut self,
         input_stage_id: usize,
         _input_partition_id: usize,
@@ -336,7 +337,7 @@ impl UnresolvedStage {
     }
 
     /// Marks the input stage ID as complete.
-    pub(super) fn complete_input(&mut self, stage_id: usize) {
+    pub fn complete_input(&mut self, stage_id: usize) {
         if let Some(input) = self.inputs.get_mut(&stage_id) {
             input.complete = true;
         }
@@ -344,12 +345,12 @@ impl UnresolvedStage {
 
     /// Returns true if all inputs are complete and we can resolve all
     /// UnresolvedShuffleExec operators to ShuffleReadExec
-    pub(super) fn resolvable(&self) -> bool {
+    pub fn resolvable(&self) -> bool {
         self.inputs.iter().all(|(_, input)| input.is_complete())
     }
 
     /// Change to the resolved state
-    pub(super) fn to_resolved(&self) -> Result<ResolvedStage> {
+    pub fn to_resolved(&self) -> Result<ResolvedStage> {
         let input_locations = self
             .inputs
             .iter()
@@ -397,7 +398,7 @@ impl Debug for UnresolvedStage {
 }
 
 impl ResolvedStage {
-    pub(super) fn new(
+    pub fn new(
         stage_id: usize,
         stage_attempt_num: usize,
         plan: Arc<dyn ExecutionPlan>,
@@ -421,7 +422,7 @@ impl ResolvedStage {
     }
 
     /// Change to the running state
-    pub(super) fn to_running(&self) -> RunningStage {
+    pub fn to_running(&self) -> RunningStage {
         RunningStage::new(
             self.stage_id,
             self.stage_attempt_num,
@@ -434,7 +435,7 @@ impl ResolvedStage {
     }
 
     /// Change to the unresolved state
-    pub(super) fn to_unresolved(&self) -> Result<UnresolvedStage> {
+    pub fn to_unresolved(&self) -> Result<UnresolvedStage> {
         let new_plan = 
crate::planner::rollback_resolved_shuffles(self.plan.clone())?;
 
         let unresolved = UnresolvedStage::new_with_inputs(
@@ -463,7 +464,7 @@ impl Debug for ResolvedStage {
 }
 
 impl RunningStage {
-    pub(super) fn new(
+    pub fn new(
         stage_id: usize,
         stage_attempt_num: usize,
         plan: Arc<dyn ExecutionPlan>,
@@ -475,6 +476,10 @@ impl RunningStage {
         Self {
             stage_id,
             stage_attempt_num,
+            stage_running_time: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_millis(),
             partitions,
             output_links,
             inputs,
@@ -486,7 +491,7 @@ impl RunningStage {
         }
     }
 
-    pub(super) fn to_successful(&self) -> SuccessfulStage {
+    pub fn to_successful(&self) -> SuccessfulStage {
         let task_infos = self
             .task_infos
             .iter()
@@ -517,7 +522,7 @@ impl RunningStage {
         }
     }
 
-    pub(super) fn to_failed(&self, error_message: String) -> FailedStage {
+    pub fn to_failed(&self, error_message: String) -> FailedStage {
         FailedStage {
             stage_id: self.stage_id,
             stage_attempt_num: self.stage_attempt_num,
@@ -531,7 +536,7 @@ impl RunningStage {
     }
 
     /// Change to the unresolved state and bump the stage attempt number
-    pub(super) fn to_unresolved(
+    pub fn to_unresolved(
         &self,
         failure_reasons: HashSet<String>,
     ) -> Result<UnresolvedStage> {
@@ -550,7 +555,7 @@ impl RunningStage {
     }
 
     /// Returns `true` if all tasks for this stage are successful
-    pub(super) fn is_successful(&self) -> bool {
+    pub fn is_successful(&self) -> bool {
         self.task_infos.iter().all(|info| {
             matches!(
                 info,
@@ -563,7 +568,7 @@ impl RunningStage {
     }
 
     /// Returns the number of successful tasks
-    pub(super) fn successful_tasks(&self) -> usize {
+    pub fn successful_tasks(&self) -> usize {
         self.task_infos
             .iter()
             .filter(|info| {
@@ -579,12 +584,12 @@ impl RunningStage {
     }
 
     /// Returns the number of scheduled tasks
-    pub(super) fn scheduled_tasks(&self) -> usize {
+    pub fn scheduled_tasks(&self) -> usize {
         self.task_infos.iter().filter(|s| s.is_some()).count()
     }
 
     /// Returns a vector of currently running tasks in this stage
-    pub(super) fn running_tasks(&self) -> Vec<(usize, usize, usize, String)> {
+    pub fn running_tasks(&self) -> Vec<(usize, usize, usize, String)> {
         self.task_infos
             .iter()
             .enumerate()
@@ -601,16 +606,12 @@ impl RunningStage {
     /// Returns the number of tasks in this stage which are available for 
scheduling.
     /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
     /// return the number of tasks where the task info is not yet set.
-    pub(super) fn available_tasks(&self) -> usize {
+    pub fn available_tasks(&self) -> usize {
         self.task_infos.iter().filter(|s| s.is_none()).count()
     }
 
     /// Update the TaskInfo for task partition
-    pub(super) fn update_task_info(
-        &mut self,
-        partition_id: usize,
-        status: TaskStatus,
-    ) -> bool {
+    pub fn update_task_info(&mut self, partition_id: usize, status: 
TaskStatus) -> bool {
         debug!("Updating TaskInfo for partition {}", partition_id);
         let task_info = self.task_infos[partition_id].as_ref().unwrap();
         let task_id = task_info.task_id;
@@ -647,7 +648,7 @@ impl RunningStage {
     }
 
     /// update and combine the task metrics to the stage metrics
-    pub(super) fn update_task_metrics(
+    pub fn update_task_metrics(
         &mut self,
         partition: usize,
         metrics: Vec<OperatorMetricsSet>,
@@ -690,7 +691,7 @@ impl RunningStage {
         Ok(())
     }
 
-    pub(super) fn combine_metrics_set(
+    pub fn combine_metrics_set(
         first: &mut MetricsSet,
         second: Vec<MetricValue>,
         partition: usize,
@@ -703,7 +704,7 @@ impl RunningStage {
         first.aggregate_by_name()
     }
 
-    pub(super) fn task_failure_number(&self, partition_id: usize) -> usize {
+    pub fn task_failure_number(&self, partition_id: usize) -> usize {
         self.task_failure_numbers[partition_id]
     }
 
@@ -745,7 +746,7 @@ impl RunningStage {
 
     /// Remove input partitions from an input stage on a given executor.
     /// Return the HashSet of removed map partition ids
-    pub(super) fn remove_input_partitions(
+    pub fn remove_input_partitions(
         &mut self,
         input_stage_id: usize,
         _input_partition_id: usize,
@@ -821,6 +822,10 @@ impl SuccessfulStage {
             task_failure_numbers: vec![0; self.partitions],
             stage_metrics,
             session_config: self.session_config.clone(),
+            stage_running_time: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_millis(),
         }
     }
 
@@ -881,7 +886,7 @@ impl Debug for SuccessfulStage {
 
 impl FailedStage {
     /// Returns the number of successful tasks
-    pub(super) fn successful_tasks(&self) -> usize {
+    pub fn successful_tasks(&self) -> usize {
         self.task_infos
             .iter()
             .filter(|info| {
@@ -896,14 +901,14 @@ impl FailedStage {
             .count()
     }
     /// Returns the number of scheduled tasks
-    pub(super) fn scheduled_tasks(&self) -> usize {
+    pub fn scheduled_tasks(&self) -> usize {
         self.task_infos.iter().filter(|s| s.is_some()).count()
     }
 
     /// Returns the number of tasks in this stage which are available for 
scheduling.
     /// If the stage is not yet resolved, then this will return `0`, otherwise 
it will
     /// return the number of tasks where the task status is not yet set.
-    pub(super) fn available_tasks(&self) -> usize {
+    pub fn available_tasks(&self) -> usize {
         self.task_infos.iter().filter(|s| s.is_none()).count()
     }
 }
@@ -942,7 +947,7 @@ fn get_stage_partitions(plan: Arc<dyn ExecutionPlan>) -> 
usize {
 /// When all tasks for the child stage are complete, it will mark the 
`StageOutput`
 /// as complete.
 #[derive(Clone, Debug, Default)]
-pub(crate) struct StageOutput {
+pub struct StageOutput {
     /// Map from partition -> partition locations
     pub partition_locations: HashMap<usize, Vec<PartitionLocation>>,
     /// Flag indicating whether all tasks are complete
@@ -950,7 +955,7 @@ pub(crate) struct StageOutput {
 }
 
 impl StageOutput {
-    pub(super) fn new() -> Self {
+    pub fn new() -> Self {
         Self {
             partition_locations: HashMap::new(),
             complete: false,
@@ -958,7 +963,7 @@ impl StageOutput {
     }
 
     /// Add a `PartitionLocation` to the `StageOutput`
-    pub(super) fn add_partition(&mut self, partition_location: 
PartitionLocation) {
+    pub fn add_partition(&mut self, partition_location: PartitionLocation) {
         if let Some(parts) = self
             .partition_locations
             .get_mut(&partition_location.partition_id.partition_id)
@@ -972,7 +977,7 @@ impl StageOutput {
         }
     }
 
-    pub(super) fn is_complete(&self) -> bool {
+    pub fn is_complete(&self) -> bool {
         self.complete
     }
 }
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index ba49149c..9c4c5acb 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -66,26 +66,25 @@ impl ExecutorManager {
         Ok(())
     }
 
-    /// Bind the ready to running tasks from [`active_jobs`] with available 
executors.
     ///
-    /// If `executors` is provided, only bind slots from the specified 
executor IDs
+    /// Bind the ready to run tasks from [`active_jobs`] to available 
executors.
     pub async fn bind_schedulable_tasks(
         &self,
-        active_jobs: Arc<HashMap<String, JobInfoCache>>,
+        running_jobs: Arc<HashMap<String, JobInfoCache>>,
     ) -> Result<Vec<BoundTask>> {
-        if active_jobs.is_empty() {
-            warn!("There's no active jobs for binding tasks");
+        if running_jobs.is_empty() {
+            debug!("There's no active jobs for binding tasks");
             return Ok(vec![]);
         }
         let alive_executors = self.get_alive_executors();
         if alive_executors.is_empty() {
-            warn!("There's no alive executors for binding tasks");
+            debug!("There's no alive executors for binding tasks");
             return Ok(vec![]);
         }
         self.cluster_state
             .bind_schedulable_tasks(
-                self.config.task_distribution,
-                active_jobs,
+                self.config.task_distribution.clone(),
+                running_jobs,
                 Some(alive_executors),
             )
             .await
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 4394dc00..c628aaf2 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -49,6 +49,7 @@ use prost::Message;
 
 pub mod execution_graph;
 pub mod execution_graph_dot;
+pub mod execution_stage;
 pub mod executor_manager;
 pub mod session_manager;
 pub mod task_manager;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to