thinkharderdev commented on code in PR #153:
URL: https://github.com/apache/arrow-ballista/pull/153#discussion_r954350298


##########
ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs:
##########
@@ -0,0 +1,914 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::task_status;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed 
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage           FailedStage
+///       ↓            ↙           ↑
+///  ResolvedStage     →     RunningStage
+///                                ↓
+///                         CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+    UnResolved(UnResolvedStage),
+    Resolved(ResolvedStage),
+    Running(RunningStage),
+    Completed(CompletedStage),
+    Failed(FailedStage),
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ExecutionStage::UnResolved(unresolved_stage) => 
unresolved_stage.fmt(f),
+            ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f),
+            ExecutionStage::Running(running_stage) => running_stage.fmt(f),
+            ExecutionStage::Completed(completed_stage) => 
completed_stage.fmt(f),
+            ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f),
+        }
+    }
+}
+
+/// For a stage whose input stages are not all completed, we say it's a 
unresolved stage
+#[derive(Clone)]
+pub(super) struct UnResolvedStage {

Review Comment:
   ```suggestion
   pub(super) struct UnresolvedStage {
   ```



##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -175,145 +154,226 @@ impl ExecutionGraph {
 
     /// An ExecutionGraph is complete if all its stages are complete
     pub fn complete(&self) -> bool {
-        self.stages.values().all(|s| s.complete())
+        self.stages
+            .values()
+            .all(|s| matches!(s, ExecutionStage::Completed(_)))
+    }
+
+    /// Revive the execution graph by converting the resolved stages to 
running changes
+    /// If existing such change, return true; else false.

Review Comment:
   ```suggestion
       /// Revive the execution graph by converting the resolved stages to 
running stages
       /// If any stages are converted, return true; else false.
   ```



##########
ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs:
##########
@@ -0,0 +1,914 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::task_status;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed 
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage           FailedStage
+///       ↓            ↙           ↑
+///  ResolvedStage     →     RunningStage
+///                                ↓
+///                         CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+    UnResolved(UnResolvedStage),
+    Resolved(ResolvedStage),
+    Running(RunningStage),
+    Completed(CompletedStage),
+    Failed(FailedStage),
+}

Review Comment:
   I really like representing this as an enum!



##########
ballista/rust/scheduler/src/state/execution_graph/execution_stage.rs:
##########
@@ -0,0 +1,914 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::TryInto;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::metrics::{MetricValue, MetricsSet};
+use datafusion::physical_plan::{ExecutionPlan, Metric, Partitioning};
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, warn};
+
+use ballista_core::error::{BallistaError, Result};
+use 
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::task_status;
+use ballista_core::serde::protobuf::{self, OperatorMetricsSet};
+use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
+use ballista_core::serde::scheduler::PartitionLocation;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+
+use crate::display::DisplayableBallistaExecutionPlan;
+
+/// A stage in the ExecutionGraph,
+/// represents a set of tasks (one per each `partition`) which can be executed 
concurrently.
+/// For a stage, there are five states. And the state machine is as follows:
+///
+/// UnResolvedStage           FailedStage
+///       ↓            ↙           ↑
+///  ResolvedStage     →     RunningStage
+///                                ↓
+///                         CompletedStage
+#[derive(Clone)]
+pub(super) enum ExecutionStage {
+    UnResolved(UnResolvedStage),
+    Resolved(ResolvedStage),
+    Running(RunningStage),
+    Completed(CompletedStage),
+    Failed(FailedStage),
+}
+
+impl Debug for ExecutionStage {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        match self {
+            ExecutionStage::UnResolved(unresolved_stage) => 
unresolved_stage.fmt(f),
+            ExecutionStage::Resolved(resolved_stage) => resolved_stage.fmt(f),
+            ExecutionStage::Running(running_stage) => running_stage.fmt(f),
+            ExecutionStage::Completed(completed_stage) => 
completed_stage.fmt(f),
+            ExecutionStage::Failed(failed_stage) => failed_stage.fmt(f),
+        }
+    }
+}
+
+/// For a stage whose input stages are not all completed, we say it's a 
unresolved stage
+#[derive(Clone)]
+pub(super) struct UnResolvedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// Represents the outputs from this stage's child stages.
+    /// This stage can only be resolved an executed once all child stages are 
completed.
+    pub(super) inputs: HashMap<usize, StageOutput>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// For a stage, if it has no inputs or all of its input stages are completed,
+/// then we it's a resolved stage
+#[derive(Clone)]
+pub(super) struct ResolvedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+}
+
+/// Different from the resolved stage, a running stage will
+/// 1. save the execution plan as encoded one to avoid serialization cost for 
creating task definition2
+/// 2. manage the task statuses
+/// 3. manage the stage-level combined metrics
+/// Running stages will only be maintained in memory and will not saved to the 
backend storage
+#[derive(Clone)]
+pub(super) struct RunningStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(super) task_statuses: Vec<Option<task_status::Status>>,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+}
+
+/// If a stage finishes successfully, its task statuses and metrics will be 
finalized
+#[derive(Clone)]
+pub(super) struct CompletedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task.
+    pub(super) task_statuses: Vec<task_status::Status>,
+    /// Combined metrics of the already finished tasks in the stage.
+    pub(super) stage_metrics: Vec<MetricsSet>,
+}
+
+/// If a stage fails, it will be with an error message
+#[derive(Clone)]
+pub(super) struct FailedStage {
+    /// Stage ID
+    pub(super) stage_id: usize,
+    /// Total number of output partitions for this stage.
+    /// This stage will produce on task for partition.
+    pub(super) partitions: usize,
+    /// Output partitioning for this stage.
+    pub(super) output_partitioning: Option<Partitioning>,
+    /// Stage ID of the stage that will take this stages outputs as inputs.
+    /// If `output_links` is empty then this the final stage in the 
`ExecutionGraph`
+    pub(super) output_links: Vec<usize>,
+    /// `ExecutionPlan` for this stage
+    pub(super) plan: Arc<dyn ExecutionPlan>,
+    /// Status of each already scheduled task. If status is None, the 
partition has not yet been scheduled
+    pub(super) task_statuses: Vec<Option<task_status::Status>>,
+    /// Combined metrics of the already finished tasks in the stage, If it is 
None, no task is finished yet.
+    pub(super) stage_metrics: Option<Vec<MetricsSet>>,
+    /// Error message
+    pub(super) error_message: String,
+}
+
+impl UnResolvedStage {

Review Comment:
   ```suggestion
   impl UnresolvedStage {
   ```



##########
ballista/rust/scheduler/src/scheduler_server/event.rs:
##########
@@ -36,6 +36,10 @@ pub enum QueryStageSchedulerEvent {
         plan: Box<LogicalPlan>,
     },
     JobSubmitted(String),
-    JobFinished(String),
+    // For a job fails without its execution graph
     JobFailed(String, String),

Review Comment:
   ```suggestion
       // For a job which failed during planning
       JobPlanningFailed(String, String),
   ```



##########
ballista/rust/scheduler/src/state/task_manager.rs:
##########
@@ -55,44 +58,62 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionPlan>
     clients: ExecutorClients,
     session_builder: SessionBuilder,
     codec: BallistaCodec<T, U>,
+    scheduler_id: String,
+    // Cache for active execution graphs curated by this scheduler
+    active_job_cache: ExecutionGraphCache,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, 
U> {
     pub fn new(
         state: Arc<dyn StateBackendClient>,
         session_builder: SessionBuilder,
         codec: BallistaCodec<T, U>,
+        scheduler_id: String,
     ) -> Self {
         Self {
             state,
             clients: Default::default(),
             session_builder,
             codec,
+            scheduler_id,
+            active_job_cache: Arc::new(RwLock::new(HashMap::new())),
         }
     }
 
     /// Generate an ExecutionGraph for the job and save it to the persistent 
state.
+    /// By default, this job will be curated by the scheduler which receives 
it.
+    /// Then we will also save it to the active execution graph
     pub async fn submit_job(
         &self,
         job_id: &str,
         session_id: &str,
         plan: Arc<dyn ExecutionPlan>,
     ) -> Result<()> {
-        let graph = ExecutionGraph::new(job_id, session_id, plan)?;
+        let mut graph =
+            ExecutionGraph::new(&self.scheduler_id, job_id, session_id, plan)?;
         self.state
             .put(
                 Keyspace::ActiveJobs,
                 job_id.to_owned(),
-                self.encode_execution_graph(graph)?,
+                self.encode_execution_graph(graph.clone())?,
             )
             .await?;
 
+        graph.revive();

Review Comment:
   Why would we need to call `revive` here?



##########
ballista/rust/scheduler/src/state/task_manager.rs:
##########
@@ -106,123 +127,63 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         }
     }
 
-    /// Generate a new random Job ID
-    pub fn generate_job_id(&self) -> String {
-        let mut rng = thread_rng();
-        std::iter::repeat(())
-            .map(|()| rng.sample(Alphanumeric))
-            .map(char::from)
-            .take(7)
-            .collect()
-    }
-
-    /// Atomically update given task statuses in the respective job and return 
a tuple containing:
+    /// Update given task statuses in the respective job and return a tuple 
containing:
     /// 1. A list of QueryStageSchedulerEvent to publish.
     /// 2. A list of reservations that can now be offered.
-    ///
-    /// When a task is updated, there may or may not be more tasks pending for 
its job. If there are more
-    /// tasks pending then we want to reschedule one of those tasks on the 
same task slot. In that case
-    /// we will set the `job_id` on the `ExecutorReservation` so the scheduler 
attempts to assign tasks from
-    /// the same job. Note that when the scheduler attempts to fill the 
reservation, there is no guarantee
-    /// that the available task is still available.
     pub(crate) async fn update_task_statuses(
         &self,
         executor: &ExecutorMetadata,
         task_status: Vec<TaskStatus>,
     ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
-        let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
-
-        with_lock(lock, async {
-            let mut events: Vec<QueryStageSchedulerEvent> = vec![];
-            let mut reservation: Vec<ExecutorReservation> = vec![];
+        let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
+        for status in task_status {
+            debug!("Task Update\n{:?}", status);
+            if let Some(job_id) = status.task_id.as_ref().map(|id| &id.job_id) 
{
+                let job_task_statuses =
+                    job_updates.entry(job_id.clone()).or_insert_with(Vec::new);
+                job_task_statuses.push(status);
+            } else {
+                warn!("Received task with no job ID");
+            }
+        }
 
-            let mut job_updates: HashMap<String, Vec<TaskStatus>> = 
HashMap::new();
+        let mut events: Vec<QueryStageSchedulerEvent> = vec![];
+        let mut total_num_tasks = 0;
+        for (job_id, statuses) in job_updates {
+            let num_tasks = statuses.len();
+            debug!("Updating {} tasks in job {}", num_tasks, job_id);
 
-            for status in task_status {
-                debug!("Task Update\n{:?}", status);
-                if let Some(job_id) = status.task_id.as_ref().map(|id| 
&id.job_id) {
-                    if let Some(statuses) = job_updates.get_mut(job_id) {
-                        statuses.push(status)
-                    } else {
-                        job_updates.insert(job_id.clone(), vec![status]);
-                    }
-                } else {
-                    warn!("Received task with no job ID");
-                }
-            }
+            total_num_tasks += num_tasks;
 
-            let mut txn_ops: Vec<(Keyspace, String, Vec<u8>)> = vec![];
-
-            for (job_id, statuses) in job_updates {
-                let num_tasks = statuses.len();
-                debug!("Updating {} tasks in job {}", num_tasks, job_id);
-
-                let mut graph = self.get_execution_graph(&job_id).await?;
-
-                graph.update_task_status(executor, statuses)?;
-
-                if graph.complete() {
-                    // If this ExecutionGraph is complete, finalize it
-                    info!(
-                        "Job {} is complete, finalizing output partitions",
-                        graph.job_id()
-                    );
-                    graph.finalize()?;
-                    
events.push(QueryStageSchedulerEvent::JobFinished(job_id.clone()));
-
-                    for _ in 0..num_tasks {
-                        reservation
-                            
.push(ExecutorReservation::new_free(executor.id.to_owned()));
-                    }
-                } else if let Some(job_status::Status::Failed(failure)) =
-                    graph.status().status
-                {
-                    events.push(QueryStageSchedulerEvent::JobFailed(
-                        job_id.clone(),
-                        failure.error,
-                    ));
-
-                    for _ in 0..num_tasks {
-                        reservation
-                            
.push(ExecutorReservation::new_free(executor.id.to_owned()));
-                    }
-                } else {
-                    // Otherwise keep the task slots reserved for this job
-                    for _ in 0..num_tasks {
-                        reservation.push(ExecutorReservation::new_assigned(
-                            executor.id.to_owned(),
-                            job_id.clone(),
-                        ));
-                    }
-                }
+            let graph = self.get_active_execution_graph(&job_id).await;
+            let job_event = if let Some(graph) = graph {
+                let mut graph = graph.write().await;
+                graph.update_task_status(executor, statuses)?
+            } else {
+                // TODO Deal with curator changed case
+                error!("Fail to find job {} in the active cache and it may not 
be curated by this scheduler", job_id);
+                None

Review Comment:
   For now it seems like we should explicitly fail the job here so the job 
state gets updated. 



##########
ballista/rust/scheduler/src/state/task_manager.rs:
##########
@@ -37,16 +37,19 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
-use log::{debug, info, warn};
+#[cfg(not(test))]
+use log::info;
+use log::{debug, error, warn};
 use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
 use std::default::Default;
 use std::sync::Arc;
 use tokio::sync::RwLock;
 use tonic::transport::Channel;
 
 type ExecutorClients = Arc<RwLock<HashMap<String, 
ExecutorGrpcClient<Channel>>>>;
+type ExecutionGraphCache = Arc<RwLock<HashMap<String, 
Arc<RwLock<ExecutionGraph>>>>>;
 
 #[derive(Clone)]
 pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> {

Review Comment:
   This requires changes to the executors as well right? Currently the executor 
will just have a single connection to the scheduler but it will need to accept 
the scheduler URL as part of the task definition for this to work correctly. 



##########
ballista/rust/scheduler/src/state/task_manager.rs:
##########
@@ -37,16 +37,19 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
-use log::{debug, info, warn};
+#[cfg(not(test))]
+use log::info;
+use log::{debug, error, warn};
 use rand::distributions::Alphanumeric;
 use rand::{thread_rng, Rng};
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
 use std::default::Default;
 use std::sync::Arc;
 use tokio::sync::RwLock;
 use tonic::transport::Channel;
 
 type ExecutorClients = Arc<RwLock<HashMap<String, 
ExecutorGrpcClient<Channel>>>>;
+type ExecutionGraphCache = Arc<RwLock<HashMap<String, 
Arc<RwLock<ExecutionGraph>>>>>;

Review Comment:
   What do you think about making this a 
`Arc<RwLock<VecDequeue<(String,ExecutionGraph)>>>`? Lookup by job ID becomes 
`O(n)` but the number of active jobs shouldn't be prohibitive and you get a 
proper queue in return so we can do FIFO scheduling of new jobs.  



##########
ballista/rust/scheduler/src/state/execution_graph.rs:
##########
@@ -15,61 +15,40 @@
 // specific language governing permissions and limitations

Review Comment:
   Should this entire file move to src/state/execution_graph/mod.rs ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to