This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new f19c0fb8 Remove revive offer event loop (#156)
f19c0fb8 is described below

commit f19c0fb88edd5e3ac52ab3e5cdb6c5f87c63b2e7
Author: yahoNanJing <[email protected]>
AuthorDate: Thu Sep 1 20:59:39 2022 +0800

    Remove revive offer event loop (#156)
    
    * Add TaskUpdating event for QueryStageSchedulerEvent
    
    * Remove revive offer event loop
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/src/event_loop.rs               |   4 +
 ballista/rust/scheduler/src/flight_sql.rs          |  19 +-
 .../rust/scheduler/src/scheduler_server/event.rs   |   3 +
 .../scheduler/src/scheduler_server/event_loop.rs   | 415 --------------------
 .../rust/scheduler/src/scheduler_server/grpc.rs    |  56 +--
 .../rust/scheduler/src/scheduler_server/mod.rs     | 421 ++++++++-------------
 .../src/scheduler_server/query_stage_scheduler.rs  | 107 +++---
 ballista/rust/scheduler/src/state/mod.rs           | 409 +++++++++++++++++++-
 ballista/rust/scheduler/src/state/task_manager.rs  |  11 +-
 ballista/rust/scheduler/src/test_utils.rs          |  44 +--
 10 files changed, 641 insertions(+), 848 deletions(-)

diff --git a/ballista/rust/core/src/event_loop.rs 
b/ballista/rust/core/src/event_loop.rs
index 595bd33f..74ee4ebf 100644
--- a/ballista/rust/core/src/event_loop.rs
+++ b/ballista/rust/core/src/event_loop.rs
@@ -128,6 +128,10 @@ pub struct EventSender<E> {
 }
 
 impl<E> EventSender<E> {
+    pub fn new(tx_event: mpsc::Sender<E>) -> Self {
+        Self { tx_event }
+    }
+
     pub async fn post_event(&self, event: E) -> Result<()> {
         self.tx_event.send(event).await.map_err(|e| {
             BallistaError::General(format!("Fail to send event due to {}", e))
diff --git a/ballista/rust/scheduler/src/flight_sql.rs 
b/ballista/rust/scheduler/src/flight_sql.rs
index 08486fc2..ef98e0da 100644
--- a/ballista/rust/scheduler/src/flight_sql.rs
+++ b/ballista/rust/scheduler/src/flight_sql.rs
@@ -35,7 +35,6 @@ use std::sync::{Arc, Mutex};
 use std::time::Duration;
 use tonic::{Response, Status, Streaming};
 
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::SchedulerServer;
 use arrow_flight::SchemaAsIpc;
 use ballista_core::config::BallistaConfig;
@@ -243,22 +242,8 @@ impl FlightSqlServiceImpl {
         plan: &LogicalPlan,
     ) -> Result<String, Status> {
         let job_id = self.server.state.task_manager.generate_job_id();
-        let query_stage_event_sender = self
-            .server
-            .query_stage_event_loop
-            .get_sender()
-            .map_err(|e| {
-                Status::internal(format!(
-                    "Could not get query stage event sender due to: {}",
-                    e
-                ))
-            })?;
-        query_stage_event_sender
-            .post_event(QueryStageSchedulerEvent::JobQueued {
-                job_id: job_id.clone(),
-                session_ctx: ctx,
-                plan: Box::new(plan.clone()),
-            })
+        self.server
+            .submit_job(&job_id, ctx, plan)
             .await
             .map_err(|e| {
                 let msg =
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs 
b/ballista/rust/scheduler/src/scheduler_server/event.rs
index 0e43bfe2..10793ccc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -19,6 +19,7 @@ use crate::state::executor_manager::ExecutorReservation;
 
 use datafusion::logical_plan::LogicalPlan;
 
+use ballista_core::serde::protobuf::TaskStatus;
 use datafusion::prelude::SessionContext;
 use std::sync::Arc;
 
@@ -42,4 +43,6 @@ pub enum QueryStageSchedulerEvent {
     // For a job fails with its execution graph setting failed
     JobRunningFailed(String),
     JobUpdated(String),
+    TaskUpdating(String, Vec<TaskStatus>),
+    ReservationOffering(Vec<ExecutorReservation>),
 }
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs 
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
deleted file mode 100644
index d60d145e..00000000
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ /dev/null
@@ -1,415 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use log::{error, info};
-
-use crate::scheduler_server::event::SchedulerServerEvent;
-use ballista_core::error::{BallistaError, Result};
-use ballista_core::event_loop::EventAction;
-
-use ballista_core::serde::AsExecutionPlan;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use tokio::sync::mpsc;
-
-use crate::state::executor_manager::ExecutorReservation;
-use crate::state::SchedulerState;
-
-/// EventAction which will process `SchedulerServerEvent`s.
-/// In push-based scheduling, this is the primary mechanism for scheduling 
tasks
-/// on executors.
-pub(crate) struct SchedulerServerEventAction<
-    T: 'static + AsLogicalPlan,
-    U: 'static + AsExecutionPlan,
-> {
-    state: Arc<SchedulerState<T, U>>,
-}
-
-impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
-    SchedulerServerEventAction<T, U>
-{
-    pub fn new(state: Arc<SchedulerState<T, U>>) -> Self {
-        Self { state }
-    }
-
-    /// Process reservations which are offered. The basic process is
-    /// 1. Attempt to fill the offered reservations with available tasks
-    /// 2. For any reservation that filled, launch the assigned task on the 
executor.
-    /// 3. For any reservations that could not be filled, cancel the 
reservation (i.e. return the
-    ///    task slot back to the pool of available task slots).
-    ///
-    /// NOTE Error handling in this method is very important. No matter what 
we need to ensure
-    /// that unfilled reservations are cancelled or else they could become 
permanently "invisible"
-    /// to the scheduler.
-    async fn offer_reservation(
-        &self,
-        reservations: Vec<ExecutorReservation>,
-    ) -> Result<Option<SchedulerServerEvent>> {
-        let (free_list, pending_tasks) = match self
-            .state
-            .task_manager
-            .fill_reservations(&reservations)
-            .await
-        {
-            Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
-                for (executor_id, task) in assignments.into_iter() {
-                    match self
-                        .state
-                        .executor_manager
-                        .get_executor_metadata(&executor_id)
-                        .await
-                    {
-                        Ok(executor) => {
-                            if let Err(e) =
-                                self.state.task_manager.launch_task(&executor, 
task).await
-                            {
-                                error!("Failed to launch new task: {:?}", e);
-                                unassigned_reservations.push(
-                                    
ExecutorReservation::new_free(executor_id.clone()),
-                                );
-                            }
-                        }
-                        Err(e) => {
-                            error!("Failed to launch new task, could not get 
executor metadata: {:?}", e);
-                            unassigned_reservations
-                                
.push(ExecutorReservation::new_free(executor_id.clone()));
-                        }
-                    }
-                }
-                (unassigned_reservations, pending_tasks)
-            }
-            Err(e) => {
-                error!("Error filling reservations: {:?}", e);
-                (reservations, 0)
-            }
-        };
-
-        dbg!(free_list.clone());
-        dbg!(pending_tasks);
-        // If any reserved slots remain, return them to the pool
-        if !free_list.is_empty() {
-            self.state
-                .executor_manager
-                .cancel_reservations(free_list)
-                .await?;
-            Ok(None)
-        } else if pending_tasks > 0 {
-            // If there are pending tasks available, try and schedule them
-            let new_reservations = self
-                .state
-                .executor_manager
-                .reserve_slots(pending_tasks as u32)
-                .await?;
-            Ok(Some(SchedulerServerEvent::Offer(new_reservations)))
-        } else {
-            Ok(None)
-        }
-    }
-}
-
-#[async_trait]
-impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
-    EventAction<SchedulerServerEvent> for SchedulerServerEventAction<T, U>
-{
-    fn on_start(&self) {
-        info!("Starting SchedulerServerEvent handler")
-    }
-
-    fn on_stop(&self) {
-        info!("Stopping SchedulerServerEvent handler")
-    }
-
-    async fn on_receive(
-        &self,
-        event: SchedulerServerEvent,
-        tx_event: &mpsc::Sender<SchedulerServerEvent>,
-        _rx_event: &mpsc::Receiver<SchedulerServerEvent>,
-    ) -> Result<()> {
-        match event {
-            SchedulerServerEvent::Offer(reservations) => {
-                if let Some(event) = 
self.offer_reservation(reservations).await? {
-                    tx_event.send(event).await.map_err(|e| {
-                        BallistaError::General(format!("Fail to send event due 
to {}", e))
-                    })?;
-                }
-            }
-        }
-
-        Ok(())
-    }
-
-    fn on_error(&self, error: BallistaError) {
-        error!("Error in SchedulerServerEvent handler: {:?}", error);
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use crate::scheduler_server::event::SchedulerServerEvent;
-    use crate::scheduler_server::event_loop::SchedulerServerEventAction;
-    use crate::state::backend::standalone::StandaloneClient;
-    use crate::state::SchedulerState;
-    use ballista_core::config::{BallistaConfig, 
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
-    use ballista_core::error::Result;
-    use ballista_core::serde::protobuf::{
-        task_status, CompletedTask, PartitionId, PhysicalPlanNode, 
ShuffleWritePartition,
-        TaskStatus,
-    };
-    use ballista_core::serde::scheduler::{
-        ExecutorData, ExecutorMetadata, ExecutorSpecification,
-    };
-    use ballista_core::serde::BallistaCodec;
-    use datafusion::arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::execution::context::default_session_builder;
-    use datafusion::logical_expr::{col, sum};
-    use datafusion::physical_plan::ExecutionPlan;
-    use datafusion::prelude::SessionContext;
-    use datafusion::test_util::scan_empty;
-    use datafusion_proto::protobuf::LogicalPlanNode;
-    use std::sync::Arc;
-
-    // We should free any reservations which are not assigned
-    #[tokio::test]
-    async fn test_offer_free_reservations() -> Result<()> {
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_default_scheduler_name(
-                state_storage,
-                default_session_builder,
-                BallistaCodec::default(),
-            ));
-
-        let executors = test_executors(1, 4);
-
-        let (executor_metadata, executor_data) = executors[0].clone();
-
-        let reservations = state
-            .executor_manager
-            .register_executor(executor_metadata, executor_data, true)
-            .await?;
-
-        let event_action = 
Arc::new(SchedulerServerEventAction::new(state.clone()));
-
-        let result = event_action.offer_reservation(reservations).await?;
-
-        assert!(result.is_none());
-
-        // All reservations should have been cancelled so we should be able to 
reserve them now
-        let reservations = state.executor_manager.reserve_slots(4).await?;
-
-        assert_eq!(reservations.len(), 4);
-
-        Ok(())
-    }
-
-    // We should fill unbound reservations to any available task
-    #[tokio::test]
-    async fn test_offer_fill_reservations() -> Result<()> {
-        let config = BallistaConfig::builder()
-            .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
-            .build()?;
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_default_scheduler_name(
-                state_storage,
-                default_session_builder,
-                BallistaCodec::default(),
-            ));
-
-        let session_ctx = state.session_manager.create_session(&config).await?;
-
-        let plan = test_graph(session_ctx.clone()).await;
-
-        // Create 4 jobs so we have four pending tasks
-        state
-            .task_manager
-            .submit_job("job-1", session_ctx.session_id().as_str(), 
plan.clone())
-            .await?;
-        state
-            .task_manager
-            .submit_job("job-2", session_ctx.session_id().as_str(), 
plan.clone())
-            .await?;
-        state
-            .task_manager
-            .submit_job("job-3", session_ctx.session_id().as_str(), 
plan.clone())
-            .await?;
-        state
-            .task_manager
-            .submit_job("job-4", session_ctx.session_id().as_str(), 
plan.clone())
-            .await?;
-
-        let executors = test_executors(1, 4);
-
-        let (executor_metadata, executor_data) = executors[0].clone();
-
-        let reservations = state
-            .executor_manager
-            .register_executor(executor_metadata, executor_data, true)
-            .await?;
-
-        let event_action = 
Arc::new(SchedulerServerEventAction::new(state.clone()));
-
-        let result = event_action.offer_reservation(reservations).await?;
-
-        assert!(result.is_none());
-
-        // All task slots should be assigned so we should not be able to 
reserve more tasks
-        let reservations = state.executor_manager.reserve_slots(4).await?;
-
-        assert_eq!(reservations.len(), 0);
-
-        Ok(())
-    }
-
-    // We should generate a new event for tasks that are still pending
-    #[tokio::test]
-    async fn test_offer_resubmit_pending() -> Result<()> {
-        let config = BallistaConfig::builder()
-            .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
-            .build()?;
-        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
-            Arc::new(SchedulerState::new_with_default_scheduler_name(
-                state_storage,
-                default_session_builder,
-                BallistaCodec::default(),
-            ));
-
-        let session_ctx = state.session_manager.create_session(&config).await?;
-
-        let plan = test_graph(session_ctx.clone()).await;
-
-        // Create a job
-        state
-            .task_manager
-            .submit_job("job-1", session_ctx.session_id().as_str(), 
plan.clone())
-            .await?;
-
-        let executors = test_executors(1, 4);
-
-        let (executor_metadata, executor_data) = executors[0].clone();
-
-        // Complete the first stage. So we should now have 4 pending tasks for 
this job stage 2
-        let mut partitions: Vec<ShuffleWritePartition> = vec![];
-
-        for partition_id in 0..4 {
-            partitions.push(ShuffleWritePartition {
-                partition_id: partition_id as u64,
-                path: "some/path".to_string(),
-                num_batches: 1,
-                num_rows: 1,
-                num_bytes: 1,
-            })
-        }
-
-        state
-            .task_manager
-            .update_task_statuses(
-                &executor_metadata,
-                vec![TaskStatus {
-                    task_id: Some(PartitionId {
-                        job_id: "job-1".to_string(),
-                        stage_id: 1,
-                        partition_id: 0,
-                    }),
-                    metrics: vec![],
-                    status: Some(task_status::Status::Completed(CompletedTask {
-                        executor_id: "executor-1".to_string(),
-                        partitions,
-                    })),
-                }],
-            )
-            .await?;
-
-        state
-            .executor_manager
-            .register_executor(executor_metadata, executor_data, false)
-            .await?;
-
-        let reservation = state.executor_manager.reserve_slots(1).await?;
-
-        assert_eq!(reservation.len(), 1);
-
-        let event_action = 
Arc::new(SchedulerServerEventAction::new(state.clone()));
-
-        // Offer the reservation. It should be filled with one of the 4 
pending tasks. The other 3 should
-        // be reserved for the other 3 tasks, emitting another offer event
-        let result = event_action.offer_reservation(reservation).await?;
-
-        assert!(result.is_some());
-
-        match result {
-            Some(SchedulerServerEvent::Offer(reservations)) => {
-                assert_eq!(reservations.len(), 3)
-            }
-            _ => panic!("Expected 3 new reservations offered"),
-        }
-
-        // Remaining 3 task slots should be reserved for pending tasks
-        let reservations = state.executor_manager.reserve_slots(4).await?;
-
-        assert_eq!(reservations.len(), 0);
-
-        Ok(())
-    }
-
-    fn test_executors(
-        total_executors: usize,
-        slots_per_executor: u32,
-    ) -> Vec<(ExecutorMetadata, ExecutorData)> {
-        let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
-
-        for i in 0..total_executors {
-            result.push((
-                ExecutorMetadata {
-                    id: format!("executor-{}", i),
-                    host: format!("host-{}", i),
-                    port: 8080,
-                    grpc_port: 9090,
-                    specification: ExecutorSpecification {
-                        task_slots: slots_per_executor,
-                    },
-                },
-                ExecutorData {
-                    executor_id: format!("executor-{}", i),
-                    total_task_slots: slots_per_executor,
-                    available_task_slots: slots_per_executor,
-                },
-            ));
-        }
-
-        result
-    }
-
-    async fn test_graph(ctx: Arc<SessionContext>) -> Arc<dyn ExecutionPlan> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Utf8, false),
-            Field::new("gmv", DataType::UInt64, false),
-        ]);
-
-        let plan = scan_empty(None, &schema, Some(vec![0, 1]))
-            .unwrap()
-            .aggregate(vec![col("id")], vec![sum(col("gmv"))])
-            .unwrap()
-            .build()
-            .unwrap();
-
-        ctx.create_physical_plan(&plan).await.unwrap()
-    }
-}
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs 
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 80a9defc..0937158b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
-
 use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, 
Query};
 
 use ballista_core::serde::protobuf::executor_registration::OptionalHost;
@@ -48,7 +47,6 @@ use std::sync::Arc;
 use std::time::{SystemTime, UNIX_EPOCH};
 use tonic::{Request, Response, Status};
 
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, 
SchedulerServerEvent};
 use crate::scheduler_server::SchedulerServer;
 use crate::state::executor_manager::ExecutorReservation;
 
@@ -189,30 +187,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                 available_task_slots: metadata.specification.task_slots,
             };
 
-            if let Ok(Some(sender)) =
-                self.event_loop.as_ref().map(|e| e.get_sender()).transpose()
-            {
-                // If we are using push-based scheduling then reserve this 
executors slots and send
-                // them for scheduling tasks.
+            async {
+                // Save the executor to state
                 let reservations = self
                     .state
-                    .executor_manager
-                    .register_executor(metadata, executor_data, true)
-                    .await
-                    .unwrap();
-
-                sender
-                    .post_event(SchedulerServerEvent::Offer(reservations))
-                    .await
-                    .unwrap();
-            } else {
-                // Otherwise just save the executor to state
-                self.state
                     .executor_manager
                     .register_executor(metadata, executor_data, false)
-                    .await
-                    .unwrap();
+                    .await?;
+
+                // If we are using push-based scheduling then reserve this 
executors slots and send
+                // them for scheduling tasks.
+                if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
+                    self.offer_reservation(reservations).await?;
+                }
+
+                Ok::<(), ballista_core::error::BallistaError>(())
             }
+            .await
+            .map_err(|e| {
+                let msg = format!("Fail to do executor registration due to: 
{}", e);
+                error!("{}", msg);
+                Status::internal(msg)
+            })?;
 
             Ok(Response::new(RegisterExecutorResult { success: true }))
         } else {
@@ -414,20 +410,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
 
             let job_id = self.state.task_manager.generate_job_id();
 
-            let query_stage_event_sender =
-                self.query_stage_event_loop.get_sender().map_err(|e| {
-                    Status::internal(format!(
-                        "Could not get query stage event sender due to: {}",
-                        e
-                    ))
-                })?;
-
-            query_stage_event_sender
-                .post_event(QueryStageSchedulerEvent::JobQueued {
-                    job_id: job_id.clone(),
-                    session_ctx,
-                    plan: Box::new(plan),
-                })
+            self.submit_job(&job_id, session_ctx, &plan)
                 .await
                 .map_err(|e| {
                     let msg =
@@ -535,12 +518,13 @@ mod test {
     #[tokio::test]
     async fn test_poll_work() -> Result<(), BallistaError> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
-        let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+        let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
             SchedulerServer::new(
                 "localhost:50050".to_owned(),
                 state_storage.clone(),
                 BallistaCodec::default(),
             );
+        scheduler.init().await?;
         let exec_meta = ExecutorRegistration {
             id: "abc".to_owned(),
             optional_host: 
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs 
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index df376c4c..19ce8680 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -20,18 +20,15 @@ use std::time::{SystemTime, UNIX_EPOCH};
 
 use ballista_core::config::TaskSchedulingPolicy;
 use ballista_core::error::Result;
-use ballista_core::event_loop::{EventAction, EventLoop};
+use ballista_core::event_loop::EventLoop;
 use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
 use datafusion::execution::context::{default_session_builder, SessionState};
-
-use datafusion::prelude::SessionConfig;
+use datafusion::logical_plan::LogicalPlan;
+use datafusion::prelude::{SessionConfig, SessionContext};
 use datafusion_proto::logical_plan::AsLogicalPlan;
 
-use log::error;
-
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, 
SchedulerServerEvent};
-use crate::scheduler_server::event_loop::SchedulerServerEventAction;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 use crate::state::backend::StateBackendClient;
 use crate::state::executor_manager::ExecutorReservation;
@@ -44,7 +41,6 @@ pub mod externalscaler {
 }
 
 pub mod event;
-mod event_loop;
 mod external_scaler;
 mod grpc;
 mod query_stage_scheduler;
@@ -57,7 +53,6 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     pub(crate) state: Arc<SchedulerState<T, U>>,
     pub start_time: u128,
     policy: TaskSchedulingPolicy,
-    event_loop: Option<EventLoop<SchedulerServerEvent>>,
     pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
 }
 
@@ -105,30 +100,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             scheduler_name.clone(),
         ));
 
-        let event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>> =
-            if matches!(policy, TaskSchedulingPolicy::PushStaged) {
-                Some(Arc::new(SchedulerServerEventAction::new(state.clone())))
-            } else {
-                None
-            };
-        SchedulerServer::new_with_event_action(scheduler_name, state, 
event_action)
+        SchedulerServer::new_with_state(scheduler_name, policy, state)
     }
 
-    fn new_with_event_action(
+    pub(crate) fn new_with_state(
         scheduler_name: String,
+        policy: TaskSchedulingPolicy,
         state: Arc<SchedulerState<T, U>>,
-        event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>>,
     ) -> Self {
-        let event_loop = event_action.map(|event_action| {
-            EventLoop::new("scheduler".to_owned(), 10000, event_action)
-        });
-        let policy = if event_loop.is_some() {
-            TaskSchedulingPolicy::PushStaged
-        } else {
-            TaskSchedulingPolicy::PullStaged
-        };
         let query_stage_scheduler =
-            Arc::new(QueryStageScheduler::new(state.clone(), None));
+            Arc::new(QueryStageScheduler::new(state.clone(), policy));
         let query_stage_event_loop =
             EventLoop::new("query_stage".to_owned(), 10000, 
query_stage_scheduler);
         Self {
@@ -139,97 +120,56 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
                 .unwrap()
                 .as_millis(),
             policy,
-            event_loop,
             query_stage_event_loop,
         }
     }
 
     pub async fn init(&mut self) -> Result<()> {
-        {
-            // initialize state
-            self.state.init().await?;
-        }
-
-        {
-            if let Some(event_loop) = self.event_loop.as_mut() {
-                event_loop.start()?;
-
-                let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
-                    self.state.clone(),
-                    Some(event_loop.get_sender()?),
-                ));
-                let query_stage_event_loop = EventLoop::new(
-                    self.query_stage_event_loop.name.clone(),
-                    self.query_stage_event_loop.buffer_size,
-                    query_stage_scheduler,
-                );
-                self.query_stage_event_loop = query_stage_event_loop;
-            }
-
-            self.query_stage_event_loop.start()?;
-        }
+        self.state.init().await?;
+        self.query_stage_event_loop.start()?;
 
         Ok(())
     }
 
+    pub(crate) async fn submit_job(
+        &self,
+        job_id: &str,
+        ctx: Arc<SessionContext>,
+        plan: &LogicalPlan,
+    ) -> Result<()> {
+        self.query_stage_event_loop
+            .get_sender()?
+            .post_event(QueryStageSchedulerEvent::JobQueued {
+                job_id: job_id.to_owned(),
+                session_ctx: ctx,
+                plan: Box::new(plan.clone()),
+            })
+            .await
+    }
+
+    /// It just send task status update event to the channel,
+    /// and will not guarantee the event processing completed after return
     pub(crate) async fn update_task_status(
         &self,
         executor_id: &str,
         tasks_status: Vec<TaskStatus>,
     ) -> Result<()> {
-        let num_status = tasks_status.len();
-        let executor = self
-            .state
-            .executor_manager
-            .get_executor_metadata(executor_id)
-            .await?;
-
-        match self
-            .state
-            .task_manager
-            .update_task_statuses(&executor, tasks_status)
+        self.query_stage_event_loop
+            .get_sender()?
+            .post_event(QueryStageSchedulerEvent::TaskUpdating(
+                executor_id.to_owned(),
+                tasks_status,
+            ))
             .await
-        {
-            Ok((stage_events, offers)) => {
-                if let Some(event_loop) = self.event_loop.as_ref() {
-                    event_loop
-                        .get_sender()?
-                        .post_event(SchedulerServerEvent::Offer(offers))
-                        .await?;
-                }
-
-                for stage_event in stage_events {
-                    self.post_stage_event(stage_event).await?;
-                }
-            }
-            Err(e) => {
-                error!(
-                    "Failed to update {} task statuses for executor {}: {:?}",
-                    num_status, executor_id, e
-                );
-                // In case task update fails, make sure to free reservations
-                if let Some(event_loop) = self.event_loop.as_ref() {
-                    let mut reservations = vec![];
-                    for _ in 0..num_status {
-                        reservations
-                            
.push(ExecutorReservation::new_free(executor_id.to_owned()));
-                    }
-
-                    event_loop
-                        .get_sender()?
-                        .post_event(SchedulerServerEvent::Offer(reservations))
-                        .await?;
-                }
-            }
-        }
-
-        Ok(())
     }
 
-    async fn post_stage_event(&self, event: QueryStageSchedulerEvent) -> 
Result<()> {
+    pub(crate) async fn offer_reservation(
+        &self,
+        reservations: Vec<ExecutorReservation>,
+    ) -> Result<()> {
         self.query_stage_event_loop
             .get_sender()?
-            .post_event(event)
+            
.post_event(QueryStageSchedulerEvent::ReservationOffering(reservations))
             .await
     }
 }
@@ -249,8 +189,7 @@ mod test {
     use ballista_core::config::{
         BallistaConfig, TaskSchedulingPolicy, 
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
     };
-    use ballista_core::error::{BallistaError, Result};
-    use ballista_core::event_loop::EventAction;
+    use ballista_core::error::Result;
 
     use ballista_core::serde::protobuf::{
         job_status, task_status, CompletedTask, FailedTask, JobStatus, 
PartitionId,
@@ -261,17 +200,12 @@ mod test {
     };
     use ballista_core::serde::BallistaCodec;
 
-    use crate::scheduler_server::event::{
-        QueryStageSchedulerEvent, SchedulerServerEvent,
-    };
     use crate::scheduler_server::SchedulerServer;
     use crate::state::backend::standalone::StandaloneClient;
 
     use crate::state::executor_manager::ExecutorReservation;
     use crate::state::SchedulerState;
-    use crate::test_utils::{
-        await_condition, ExplodingTableProvider, SchedulerEventObserver,
-    };
+    use crate::test_utils::{await_condition, ExplodingTableProvider};
 
     #[tokio::test]
     async fn test_pull_scheduling() -> Result<()> {
@@ -297,33 +231,12 @@ mod test {
             .create_session(&config)
             .await?;
 
-        let plan = async {
-            let optimized_plan = ctx.optimize(&plan).map_err(|e| {
-                BallistaError::General(format!(
-                    "Could not create optimized logical plan: {}",
-                    e
-                ))
-            })?;
-
-            ctx.create_physical_plan(&optimized_plan)
-                .await
-                .map_err(|e| {
-                    BallistaError::General(format!(
-                        "Could not create physical plan: {}",
-                        e
-                    ))
-                })
-        }
-        .await?;
-
         let job_id = "job";
-        let session_id = ctx.session_id();
 
         // Submit job
         scheduler
             .state
-            .task_manager
-            .submit_job(job_id, &session_id, plan)
+            .submit_job(job_id, ctx, &plan)
             .await
             .expect("submitting plan");
 
@@ -371,7 +284,8 @@ mod test {
                 };
 
                 scheduler
-                    .update_task_status("executor-1", vec![task_status])
+                    .state
+                    .update_task_statuses("executor-1", vec![task_status])
                     .await?;
             } else {
                 break;
@@ -405,13 +319,7 @@ mod test {
         let plan = test_plan();
         let task_slots = 4;
 
-        let (sender, mut event_receiver) =
-            tokio::sync::mpsc::channel::<SchedulerServerEvent>(1000);
-        let (error_sender, _) = 
tokio::sync::mpsc::channel::<BallistaError>(1000);
-
-        let event_action = SchedulerEventObserver::new(sender, error_sender);
-
-        let scheduler = 
test_scheduler_with_event_action(Arc::new(event_action)).await?;
+        let scheduler = test_push_staged_scheduler().await?;
 
         let executors = test_executors(task_slots);
         for (executor_metadata, executor_data) in executors {
@@ -432,21 +340,39 @@ mod test {
 
         let job_id = "job";
 
-        // Send JobQueued event to kick off the event loop
-        scheduler
-            .query_stage_event_loop
-            .get_sender()?
-            .post_event(QueryStageSchedulerEvent::JobQueued {
-                job_id: job_id.to_owned(),
-                session_ctx: ctx,
-                plan: Box::new(plan),
-            })
-            .await?;
+        scheduler.state.submit_job(job_id, ctx, &plan).await?;
 
         // Complete tasks that are offered through scheduler events
-        while let Some(SchedulerServerEvent::Offer(reservations)) =
-            event_receiver.recv().await
-        {
+        loop {
+            // Check condition
+            let available_tasks = {
+                let graph = scheduler
+                    .state
+                    .task_manager
+                    .get_active_execution_graph(job_id)
+                    .await
+                    .unwrap();
+                let graph = graph.read().await;
+                if graph.complete() {
+                    break;
+                }
+                graph.available_tasks()
+            };
+
+            if available_tasks == 0 {
+                tokio::time::sleep(Duration::from_millis(5)).await;
+                continue;
+            }
+
+            let reservations: Vec<ExecutorReservation> = scheduler
+                .state
+                .executor_manager
+                .reserve_slots(available_tasks as u32)
+                .await?
+                .into_iter()
+                .map(|res| res.assign(job_id.to_owned()))
+                .collect();
+
             let free_list = match scheduler
                 .state
                 .task_manager
@@ -454,11 +380,6 @@ mod test {
                 .await
             {
                 Ok((assignments, mut unassigned_reservations, _)) => {
-                    // Break when we are no longer assigning tasks
-                    if unassigned_reservations.len() == reservations.len() {
-                        break;
-                    }
-
                     for (executor_id, task) in assignments.into_iter() {
                         match scheduler
                             .state
@@ -544,13 +465,7 @@ mod test {
         let plan = test_plan();
         let task_slots = 4;
 
-        let (sender, mut event_receiver) =
-            tokio::sync::mpsc::channel::<SchedulerServerEvent>(1000);
-        let (error_sender, _) = 
tokio::sync::mpsc::channel::<BallistaError>(1000);
-
-        let event_action = SchedulerEventObserver::new(sender, error_sender);
-
-        let scheduler = 
test_scheduler_with_event_action(Arc::new(event_action)).await?;
+        let scheduler = test_push_staged_scheduler().await?;
 
         let executors = test_executors(task_slots);
         for (executor_metadata, executor_data) in executors {
@@ -571,94 +486,92 @@ mod test {
 
         let job_id = "job";
 
-        // Send JobQueued event to kick off the event loop
-        scheduler
-            .query_stage_event_loop
-            .get_sender()?
-            .post_event(QueryStageSchedulerEvent::JobQueued {
-                job_id: job_id.to_owned(),
-                session_ctx: ctx,
-                plan: Box::new(plan),
-            })
+        scheduler.state.submit_job(job_id, ctx, &plan).await?;
+
+        let available_tasks = scheduler
+            .state
+            .task_manager
+            .get_available_task_count(job_id)
             .await?;
 
+        let reservations: Vec<ExecutorReservation> = scheduler
+            .state
+            .executor_manager
+            .reserve_slots(available_tasks as u32)
+            .await?
+            .into_iter()
+            .map(|res| res.assign(job_id.to_owned()))
+            .collect();
+
         // Complete tasks that are offered through scheduler events
-        if let Some(SchedulerServerEvent::Offer(reservations)) =
-            event_receiver.recv().await
+        let free_list = match scheduler
+            .state
+            .task_manager
+            .fill_reservations(&reservations)
+            .await
         {
-            let free_list = match scheduler
-                .state
-                .task_manager
-                .fill_reservations(&reservations)
-                .await
-            {
-                Ok((assignments, mut unassigned_reservations, _)) => {
-                    for (executor_id, task) in assignments.into_iter() {
-                        match scheduler
-                            .state
-                            .executor_manager
-                            .get_executor_metadata(&executor_id)
-                            .await
-                        {
-                            Ok(executor) => {
-                                let mut partitions: Vec<ShuffleWritePartition> 
= vec![];
-
-                                let num_partitions = task
-                                    .output_partitioning
-                                    .map(|p| p.partition_count())
-                                    .unwrap_or(1);
-
-                                for partition_id in 0..num_partitions {
-                                    partitions.push(ShuffleWritePartition {
-                                        partition_id: partition_id as u64,
-                                        path: "some/path".to_string(),
-                                        num_batches: 1,
-                                        num_rows: 1,
-                                        num_bytes: 1,
-                                    })
-                                }
-
-                                // Complete the task
-                                let task_status = TaskStatus {
-                                    status: Some(task_status::Status::Failed(
-                                        FailedTask {
-                                            error: "".to_string(),
-                                        },
-                                    )),
-                                    metrics: vec![],
-                                    task_id: Some(PartitionId {
-                                        job_id: job_id.to_owned(),
-                                        stage_id: task.partition.stage_id as 
u32,
-                                        partition_id: 
task.partition.partition_id as u32,
-                                    }),
-                                };
-
-                                scheduler
-                                    .update_task_status(&executor.id, 
vec![task_status])
-                                    .await?;
-                            }
-                            Err(_e) => {
-                                unassigned_reservations.push(
-                                    
ExecutorReservation::new_free(executor_id.clone()),
-                                );
+            Ok((assignments, mut unassigned_reservations, _)) => {
+                for (executor_id, task) in assignments.into_iter() {
+                    match scheduler
+                        .state
+                        .executor_manager
+                        .get_executor_metadata(&executor_id)
+                        .await
+                    {
+                        Ok(executor) => {
+                            let mut partitions: Vec<ShuffleWritePartition> = 
vec![];
+
+                            let num_partitions = task
+                                .output_partitioning
+                                .map(|p| p.partition_count())
+                                .unwrap_or(1);
+
+                            for partition_id in 0..num_partitions {
+                                partitions.push(ShuffleWritePartition {
+                                    partition_id: partition_id as u64,
+                                    path: "some/path".to_string(),
+                                    num_batches: 1,
+                                    num_rows: 1,
+                                    num_bytes: 1,
+                                })
                             }
+
+                            // Complete the task
+                            let task_status = TaskStatus {
+                                status: 
Some(task_status::Status::Failed(FailedTask {
+                                    error: "".to_string(),
+                                })),
+                                metrics: vec![],
+                                task_id: Some(PartitionId {
+                                    job_id: job_id.to_owned(),
+                                    stage_id: task.partition.stage_id as u32,
+                                    partition_id: task.partition.partition_id 
as u32,
+                                }),
+                            };
+
+                            scheduler
+                                .state
+                                .update_task_statuses(&executor.id, 
vec![task_status])
+                                .await?;
+                        }
+                        Err(_e) => {
+                            unassigned_reservations
+                                
.push(ExecutorReservation::new_free(executor_id.clone()));
                         }
                     }
-                    unassigned_reservations
                 }
-                Err(_e) => reservations,
-            };
-
-            // If any reserved slots remain, return them to the pool
-            if !free_list.is_empty() {
-                scheduler
-                    .state
-                    .executor_manager
-                    .cancel_reservations(free_list)
-                    .await?;
+                unassigned_reservations
             }
-        } else {
-            panic!("No reservations offered");
+            Err(_e) => reservations,
+        };
+
+        // If any reserved slots remain, return them to the pool
+        if !free_list.is_empty() {
+            scheduler
+                .state
+                .executor_manager
+                .cancel_reservations(free_list)
+                .await?;
         }
 
         let status = 
scheduler.state.task_manager.get_job_status(job_id).await?;
@@ -682,13 +595,7 @@ mod test {
     async fn test_planning_failure() -> Result<()> {
         let task_slots = 4;
 
-        let (sender, _event_receiver) =
-            tokio::sync::mpsc::channel::<SchedulerServerEvent>(1000);
-        let (error_sender, _) = 
tokio::sync::mpsc::channel::<BallistaError>(1000);
-
-        let event_action = SchedulerEventObserver::new(sender, error_sender);
-
-        let scheduler = 
test_scheduler_with_event_action(Arc::new(event_action)).await?;
+        let scheduler = test_push_staged_scheduler().await?;
 
         let config = test_session(task_slots);
 
@@ -704,17 +611,8 @@ mod test {
 
         let job_id = "job";
 
-        // Send JobQueued event to kick off the event loop
         // This should fail when we try and create the physical plan
-        scheduler
-            .query_stage_event_loop
-            .get_sender()?
-            .post_event(QueryStageSchedulerEvent::JobQueued {
-                job_id: job_id.to_owned(),
-                session_ctx: ctx,
-                plan: Box::new(plan),
-            })
-            .await?;
+        scheduler.submit_job(job_id, ctx, &plan).await?;
 
         let scheduler = scheduler.clone();
 
@@ -754,8 +652,7 @@ mod test {
         Ok(scheduler)
     }
 
-    async fn test_scheduler_with_event_action(
-        event_action: Arc<dyn EventAction<SchedulerServerEvent>>,
+    async fn test_push_staged_scheduler(
     ) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
         let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
         let state = Arc::new(SchedulerState::new_with_default_scheduler_name(
@@ -764,10 +661,10 @@ mod test {
             BallistaCodec::default(),
         ));
         let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
-            SchedulerServer::new_with_event_action(
+            SchedulerServer::new_with_state(
                 "localhost:50050".to_owned(),
+                TaskSchedulingPolicy::PushStaged,
                 state,
-                Some(event_action),
             );
         scheduler.init().await?;
 
diff --git 
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 1c403ea6..53fe38f5 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -16,21 +16,19 @@
 // under the License.
 
 use std::sync::Arc;
-use std::time::Instant;
 
 use async_trait::async_trait;
-use datafusion::logical_plan::LogicalPlan;
-use datafusion::prelude::SessionContext;
 use log::{debug, error, info};
 
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::{EventAction, EventSender};
 
+use ballista_core::config::TaskSchedulingPolicy;
 use ballista_core::serde::AsExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use tokio::sync::mpsc;
 
-use crate::scheduler_server::event::{QueryStageSchedulerEvent, 
SchedulerServerEvent};
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
 use crate::state::executor_manager::ExecutorReservation;
 use crate::state::SchedulerState;
@@ -40,18 +38,15 @@ pub(crate) struct QueryStageScheduler<
     U: 'static + AsExecutionPlan,
 > {
     state: Arc<SchedulerState<T, U>>,
-    event_sender: Option<EventSender<SchedulerServerEvent>>,
+    policy: TaskSchedulingPolicy,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
QueryStageScheduler<T, U> {
     pub(crate) fn new(
         state: Arc<SchedulerState<T, U>>,
-        event_sender: Option<EventSender<SchedulerServerEvent>>,
+        policy: TaskSchedulingPolicy,
     ) -> Self {
-        Self {
-            state,
-            event_sender,
-        }
+        Self { state, policy }
     }
 }
 
@@ -73,6 +68,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
         tx_event: &mpsc::Sender<QueryStageSchedulerEvent>,
         _rx_event: &mpsc::Receiver<QueryStageSchedulerEvent>,
     ) -> Result<()> {
+        let tx_event = EventSender::new(tx_event.clone());
         match event {
             QueryStageSchedulerEvent::JobQueued {
                 job_id,
@@ -81,11 +77,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
             } => {
                 info!("Job {} queued", job_id);
                 let state = self.state.clone();
-                let tx_event = tx_event.clone();
                 tokio::spawn(async move {
                     let event = if let Err(e) =
-                        submit_job(state.clone(), job_id.clone(), session_ctx, 
&plan)
-                            .await
+                        state.submit_job(&job_id, session_ctx, &plan).await
                     {
                         let msg = format!("Error planning job {}: {:?}", 
job_id, e);
                         error!("{}", &msg);
@@ -94,17 +88,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         QueryStageSchedulerEvent::JobSubmitted(job_id)
                     };
                     tx_event
-                        .send(event)
+                        .post_event(event)
                         .await
-                        .map_err(|e| {
-                            error!("Fail to send event due to {}", e);
-                        })
+                        .map_err(|e| error!("Fail to send event due to {}", e))
                         .unwrap();
                 });
             }
             QueryStageSchedulerEvent::JobSubmitted(job_id) => {
                 info!("Job {} submitted", job_id);
-                if let Some(sender) = &self.event_sender {
+                if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
                     let available_tasks = self
                         .state
                         .task_manager
@@ -126,16 +118,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         job_id
                     );
 
-                    if let Err(e) = sender
-                        
.post_event(SchedulerServerEvent::Offer(reservations.clone()))
-                        .await
-                    {
-                        error!("Error posting offer: {:?}", e);
-                        self.state
-                            .executor_manager
-                            .cancel_reservations(reservations)
-                            .await?;
-                    }
+                    tx_event
+                        
.post_event(QueryStageSchedulerEvent::ReservationOffering(
+                            reservations,
+                        ))
+                        .await?;
                 }
             }
             QueryStageSchedulerEvent::JobPlanningFailed(job_id, fail_message) 
=> {
@@ -157,6 +144,45 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                 error!("Job {} Updated", job_id);
                 self.state.task_manager.update_job(&job_id).await?;
             }
+            QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status) 
=> {
+                let num_status = tasks_status.len();
+                match self
+                    .state
+                    .update_task_statuses(&executor_id, tasks_status)
+                    .await
+                {
+                    Ok((stage_events, offers)) => {
+                        if matches!(self.policy, 
TaskSchedulingPolicy::PushStaged) {
+                            tx_event
+                                .post_event(
+                                    
QueryStageSchedulerEvent::ReservationOffering(offers),
+                                )
+                                .await?;
+                        }
+
+                        for stage_event in stage_events {
+                            tx_event.post_event(stage_event).await?;
+                        }
+                    }
+                    Err(e) => {
+                        error!(
+                            "Failed to update {} task statuses for executor 
{}: {:?}",
+                            num_status, executor_id, e
+                        );
+                        // TODO error handling
+                    }
+                }
+            }
+            QueryStageSchedulerEvent::ReservationOffering(reservations) => {
+                let reservations = 
self.state.offer_reservation(reservations).await?;
+                if !reservations.is_empty() {
+                    tx_event
+                        
.post_event(QueryStageSchedulerEvent::ReservationOffering(
+                            reservations,
+                        ))
+                        .await?;
+                }
+            }
         }
 
         Ok(())
@@ -166,28 +192,3 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
         error!("Error received by QueryStageScheduler: {:?}", error);
     }
 }
-
-async fn submit_job<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
-    state: Arc<SchedulerState<T, U>>,
-    job_id: String,
-    session_ctx: Arc<SessionContext>,
-    plan: &LogicalPlan,
-) -> Result<()> {
-    let start = Instant::now();
-    let optimized_plan = session_ctx.optimize(plan)?;
-
-    debug!("Calculated optimized plan: {:?}", optimized_plan);
-
-    let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
-
-    state
-        .task_manager
-        .submit_job(&job_id, &session_ctx.session_id(), plan.clone())
-        .await?;
-
-    let elapsed = start.elapsed();
-
-    info!("Planned job {} in {:?}", job_id, elapsed);
-
-    Ok(())
-}
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index 5668c6b1..43cb25e6 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -17,24 +17,25 @@
 
 use std::any::type_name;
 use std::future::Future;
-
 use std::sync::Arc;
+use std::time::Instant;
 
-use prost::Message;
-
-use ballista_core::error::{BallistaError, Result};
-
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
 use crate::scheduler_server::SessionBuilder;
-
-use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion_proto::logical_plan::AsLogicalPlan;
-
 use crate::state::backend::{Lock, StateBackendClient};
-
-use crate::state::executor_manager::ExecutorManager;
+use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
 use crate::state::session_manager::SessionManager;
 use crate::state::task_manager::TaskManager;
 
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::serde::protobuf::TaskStatus;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use datafusion::logical_plan::LogicalPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, error, info};
+use prost::Message;
+
 pub mod backend;
 pub mod execution_graph;
 pub mod executor_manager;
@@ -122,6 +123,151 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
     pub async fn init(&self) -> Result<()> {
         self.executor_manager.init().await
     }
+
+    #[cfg(not(test))]
+    pub(crate) async fn update_task_statuses(
+        &self,
+        executor_id: &str,
+        tasks_status: Vec<TaskStatus>,
+    ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
+        let executor = self
+            .executor_manager
+            .get_executor_metadata(executor_id)
+            .await?;
+
+        let total_num_tasks = tasks_status.len();
+        let reservations = (0..total_num_tasks)
+            .into_iter()
+            .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
+            .collect();
+
+        let events = self
+            .task_manager
+            .update_task_statuses(&executor, tasks_status)
+            .await?;
+
+        Ok((events, reservations))
+    }
+
+    #[cfg(test)]
+    pub(crate) async fn update_task_statuses(
+        &self,
+        executor_id: &str,
+        tasks_status: Vec<TaskStatus>,
+    ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
+        let executor = self
+            .executor_manager
+            .get_executor_metadata(executor_id)
+            .await?;
+
+        let total_num_tasks = tasks_status.len();
+        let free_list = (0..total_num_tasks)
+            .into_iter()
+            .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
+            .collect();
+
+        let events = self
+            .task_manager
+            .update_task_statuses(&executor, tasks_status)
+            .await?;
+
+        self.executor_manager.cancel_reservations(free_list).await?;
+
+        Ok((events, vec![]))
+    }
+
+    /// Process reservations which are offered. The basic process is
+    /// 1. Attempt to fill the offered reservations with available tasks
+    /// 2. For any reservation that filled, launch the assigned task on the 
executor.
+    /// 3. For any reservations that could not be filled, cancel the 
reservation (i.e. return the
+    ///    task slot back to the pool of available task slots).
+    ///
+    /// NOTE Error handling in this method is very important. No matter what 
we need to ensure
+    /// that unfilled reservations are cancelled or else they could become 
permanently "invisible"
+    /// to the scheduler.
+    pub(crate) async fn offer_reservation(
+        &self,
+        reservations: Vec<ExecutorReservation>,
+    ) -> Result<Vec<ExecutorReservation>> {
+        let (free_list, pending_tasks) = match self
+            .task_manager
+            .fill_reservations(&reservations)
+            .await
+        {
+            Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
+                for (executor_id, task) in assignments.into_iter() {
+                    match self
+                        .executor_manager
+                        .get_executor_metadata(&executor_id)
+                        .await
+                    {
+                        Ok(executor) => {
+                            if let Err(e) =
+                                self.task_manager.launch_task(&executor, 
task).await
+                            {
+                                error!("Failed to launch new task: {:?}", e);
+                                unassigned_reservations.push(
+                                    
ExecutorReservation::new_free(executor_id.clone()),
+                                );
+                            }
+                        }
+                        Err(e) => {
+                            error!("Failed to launch new task, could not get 
executor metadata: {:?}", e);
+                            unassigned_reservations
+                                
.push(ExecutorReservation::new_free(executor_id.clone()));
+                        }
+                    }
+                }
+                (unassigned_reservations, pending_tasks)
+            }
+            Err(e) => {
+                error!("Error filling reservations: {:?}", e);
+                (reservations, 0)
+            }
+        };
+
+        dbg!(free_list.clone());
+        dbg!(pending_tasks);
+
+        let mut new_reservations = vec![];
+        if !free_list.is_empty() {
+            // If any reserved slots remain, return them to the pool
+            self.executor_manager.cancel_reservations(free_list).await?;
+        } else if pending_tasks > 0 {
+            // If there are pending tasks available, try and schedule them
+            let pending_reservations = self
+                .executor_manager
+                .reserve_slots(pending_tasks as u32)
+                .await?;
+            new_reservations.extend(pending_reservations);
+        }
+
+        Ok(new_reservations)
+    }
+
+    pub(crate) async fn submit_job(
+        &self,
+        job_id: &str,
+        session_ctx: Arc<SessionContext>,
+        plan: &LogicalPlan,
+    ) -> Result<()> {
+        let start = Instant::now();
+        let optimized_plan = session_ctx.optimize(plan)?;
+
+        debug!("Calculated optimized plan: {:?}", optimized_plan);
+
+        let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
+
+        self.task_manager
+            .submit_job(job_id, &session_ctx.session_id(), plan)
+            .await?;
+
+        let elapsed = start.elapsed();
+
+        info!("Planned job {} in {:?}", job_id, elapsed);
+
+        Ok(())
+    }
 }
 
 pub async fn with_lock<Out, F: Future<Output = Out>>(lock: Box<dyn Lock>, op: 
F) -> Out {
@@ -132,5 +278,242 @@ pub async fn with_lock<Out, F: Future<Output = 
Out>>(lock: Box<dyn Lock>, op: F)
     result
 }
 
-#[cfg(all(test, feature = "sled"))]
-mod test {}
+#[cfg(test)]
+mod test {
+    use crate::state::backend::standalone::StandaloneClient;
+    use crate::state::SchedulerState;
+    use ballista_core::config::{BallistaConfig, 
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
+    use ballista_core::error::Result;
+    use ballista_core::serde::protobuf::{
+        task_status, CompletedTask, PartitionId, PhysicalPlanNode, 
ShuffleWritePartition,
+        TaskStatus,
+    };
+    use ballista_core::serde::scheduler::{
+        ExecutorData, ExecutorMetadata, ExecutorSpecification,
+    };
+    use ballista_core::serde::BallistaCodec;
+    use datafusion::arrow::datatypes::{DataType, Field, Schema};
+    use datafusion::execution::context::default_session_builder;
+    use datafusion::logical_expr::{col, sum};
+    use datafusion::physical_plan::ExecutionPlan;
+    use datafusion::prelude::SessionContext;
+    use datafusion::test_util::scan_empty;
+    use datafusion_proto::protobuf::LogicalPlanNode;
+    use std::sync::Arc;
+
+    // We should free any reservations which are not assigned
+    #[tokio::test]
+    async fn test_offer_free_reservations() -> Result<()> {
+        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
+            Arc::new(SchedulerState::new_with_default_scheduler_name(
+                state_storage,
+                default_session_builder,
+                BallistaCodec::default(),
+            ));
+
+        let executors = test_executors(1, 4);
+
+        let (executor_metadata, executor_data) = executors[0].clone();
+
+        let reservations = state
+            .executor_manager
+            .register_executor(executor_metadata, executor_data, true)
+            .await?;
+
+        let result = state.offer_reservation(reservations).await?;
+
+        assert!(result.is_empty());
+
+        // All reservations should have been cancelled so we should be able to 
reserve them now
+        let reservations = state.executor_manager.reserve_slots(4).await?;
+
+        assert_eq!(reservations.len(), 4);
+
+        Ok(())
+    }
+
+    // We should fill unbound reservations to any available task
+    #[tokio::test]
+    async fn test_offer_fill_reservations() -> Result<()> {
+        let config = BallistaConfig::builder()
+            .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
+            .build()?;
+        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
+            Arc::new(SchedulerState::new_with_default_scheduler_name(
+                state_storage,
+                default_session_builder,
+                BallistaCodec::default(),
+            ));
+
+        let session_ctx = state.session_manager.create_session(&config).await?;
+
+        let plan = test_graph(session_ctx.clone()).await;
+
+        // Create 4 jobs so we have four pending tasks
+        state
+            .task_manager
+            .submit_job("job-1", session_ctx.session_id().as_str(), 
plan.clone())
+            .await?;
+        state
+            .task_manager
+            .submit_job("job-2", session_ctx.session_id().as_str(), 
plan.clone())
+            .await?;
+        state
+            .task_manager
+            .submit_job("job-3", session_ctx.session_id().as_str(), 
plan.clone())
+            .await?;
+        state
+            .task_manager
+            .submit_job("job-4", session_ctx.session_id().as_str(), 
plan.clone())
+            .await?;
+
+        let executors = test_executors(1, 4);
+
+        let (executor_metadata, executor_data) = executors[0].clone();
+
+        let reservations = state
+            .executor_manager
+            .register_executor(executor_metadata, executor_data, true)
+            .await?;
+
+        let result = state.offer_reservation(reservations).await?;
+
+        assert!(result.is_empty());
+
+        // All task slots should be assigned so we should not be able to 
reserve more tasks
+        let reservations = state.executor_manager.reserve_slots(4).await?;
+
+        assert_eq!(reservations.len(), 0);
+
+        Ok(())
+    }
+
+    // We should generate a new event for tasks that are still pending
+    #[tokio::test]
+    async fn test_offer_resubmit_pending() -> Result<()> {
+        let config = BallistaConfig::builder()
+            .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
+            .build()?;
+        let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+        let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
+            Arc::new(SchedulerState::new_with_default_scheduler_name(
+                state_storage,
+                default_session_builder,
+                BallistaCodec::default(),
+            ));
+
+        let session_ctx = state.session_manager.create_session(&config).await?;
+
+        let plan = test_graph(session_ctx.clone()).await;
+
+        // Create a job
+        state
+            .task_manager
+            .submit_job("job-1", session_ctx.session_id().as_str(), 
plan.clone())
+            .await?;
+
+        let executors = test_executors(1, 4);
+
+        let (executor_metadata, executor_data) = executors[0].clone();
+
+        // Complete the first stage. So we should now have 4 pending tasks for 
this job stage 2
+        let mut partitions: Vec<ShuffleWritePartition> = vec![];
+
+        for partition_id in 0..4 {
+            partitions.push(ShuffleWritePartition {
+                partition_id: partition_id as u64,
+                path: "some/path".to_string(),
+                num_batches: 1,
+                num_rows: 1,
+                num_bytes: 1,
+            })
+        }
+
+        state
+            .task_manager
+            .update_task_statuses(
+                &executor_metadata,
+                vec![TaskStatus {
+                    task_id: Some(PartitionId {
+                        job_id: "job-1".to_string(),
+                        stage_id: 1,
+                        partition_id: 0,
+                    }),
+                    metrics: vec![],
+                    status: Some(task_status::Status::Completed(CompletedTask {
+                        executor_id: "executor-1".to_string(),
+                        partitions,
+                    })),
+                }],
+            )
+            .await?;
+
+        state
+            .executor_manager
+            .register_executor(executor_metadata, executor_data, false)
+            .await?;
+
+        let reservations = state.executor_manager.reserve_slots(1).await?;
+
+        assert_eq!(reservations.len(), 1);
+
+        // Offer the reservation. It should be filled with one of the 4 
pending tasks. The other 3 should
+        // be reserved for the other 3 tasks, emitting another offer event
+        let reservations = state.offer_reservation(reservations).await?;
+
+        assert_eq!(reservations.len(), 3);
+
+        // Remaining 3 task slots should be reserved for pending tasks
+        let reservations = state.executor_manager.reserve_slots(4).await?;
+
+        assert_eq!(reservations.len(), 0);
+
+        Ok(())
+    }
+
+    fn test_executors(
+        total_executors: usize,
+        slots_per_executor: u32,
+    ) -> Vec<(ExecutorMetadata, ExecutorData)> {
+        let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
+
+        for i in 0..total_executors {
+            result.push((
+                ExecutorMetadata {
+                    id: format!("executor-{}", i),
+                    host: format!("host-{}", i),
+                    port: 8080,
+                    grpc_port: 9090,
+                    specification: ExecutorSpecification {
+                        task_slots: slots_per_executor,
+                    },
+                },
+                ExecutorData {
+                    executor_id: format!("executor-{}", i),
+                    total_task_slots: slots_per_executor,
+                    available_task_slots: slots_per_executor,
+                },
+            ));
+        }
+
+        result
+    }
+
+    async fn test_graph(ctx: Arc<SessionContext>) -> Arc<dyn ExecutionPlan> {
+        let schema = Schema::new(vec![
+            Field::new("id", DataType::Utf8, false),
+            Field::new("gmv", DataType::UInt64, false),
+        ]);
+
+        let plan = scan_empty(None, &schema, Some(vec![0, 1]))
+            .unwrap()
+            .aggregate(vec![col("id")], vec![sum(col("gmv"))])
+            .unwrap()
+            .build()
+            .unwrap();
+
+        ctx.create_physical_plan(&plan).await.unwrap()
+    }
+}
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs 
b/ballista/rust/scheduler/src/state/task_manager.rs
index 480f9369..25357f38 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -133,7 +133,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         &self,
         executor: &ExecutorMetadata,
         task_status: Vec<TaskStatus>,
-    ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
+    ) -> Result<Vec<QueryStageSchedulerEvent>> {
         let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
         for status in task_status {
             debug!("Task Update\n{:?}", status);
@@ -147,13 +147,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         }
 
         let mut events: Vec<QueryStageSchedulerEvent> = vec![];
-        let mut total_num_tasks = 0;
         for (job_id, statuses) in job_updates {
             let num_tasks = statuses.len();
             debug!("Updating {} tasks in job {}", num_tasks, job_id);
 
-            total_num_tasks += num_tasks;
-
             let graph = self.get_active_execution_graph(&job_id).await;
             let job_event = if let Some(graph) = graph {
                 let mut graph = graph.write().await;
@@ -169,11 +166,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
             }
         }
 
-        let reservation = (0..total_num_tasks)
-            .into_iter()
-            .map(|_| ExecutorReservation::new_free(executor.id.to_owned()))
-            .collect();
-        Ok((events, reservation))
+        Ok(events)
     }
 
     /// Take a list of executor reservations and fill them with tasks that are 
ready
diff --git a/ballista/rust/scheduler/src/test_utils.rs 
b/ballista/rust/scheduler/src/test_utils.rs
index c60e8ff3..c115ce72 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -15,16 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista_core::error::{BallistaError, Result};
+use ballista_core::error::Result;
 use std::any::Any;
 use std::future::Future;
 use std::sync::Arc;
 use std::time::Duration;
 
-use crate::scheduler_server::event::SchedulerServerEvent;
-
 use async_trait::async_trait;
-use ballista_core::event_loop::EventAction;
 
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::common::DataFusionError;
@@ -33,50 +30,11 @@ use datafusion::execution::context::{SessionConfig, 
SessionContext, SessionState
 use datafusion::logical_expr::Expr;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::CsvReadOptions;
-use tokio::sync::mpsc::{Receiver, Sender};
 
 pub const TPCH_TABLES: &[&str] = &[
     "part", "supplier", "partsupp", "customer", "orders", "lineitem", 
"nation", "region",
 ];
 
-/// Test utility that allows observing scheduler events.
-pub struct SchedulerEventObserver {
-    sender: Sender<SchedulerServerEvent>,
-    errors: Sender<BallistaError>,
-}
-
-impl SchedulerEventObserver {
-    pub fn new(
-        sender: Sender<SchedulerServerEvent>,
-        errors: Sender<BallistaError>,
-    ) -> Self {
-        Self { sender, errors }
-    }
-}
-
-#[async_trait]
-impl EventAction<SchedulerServerEvent> for SchedulerEventObserver {
-    fn on_start(&self) {}
-
-    fn on_stop(&self) {}
-
-    async fn on_receive(
-        &self,
-        event: SchedulerServerEvent,
-        _tx_event: &Sender<SchedulerServerEvent>,
-        _rx_event: &Receiver<SchedulerServerEvent>,
-    ) -> Result<()> {
-        self.sender.send(event).await.unwrap();
-
-        Ok(())
-    }
-
-    fn on_error(&self, error: BallistaError) {
-        let errors = self.errors.clone();
-        tokio::task::spawn(async move { errors.send(error).await.unwrap() });
-    }
-}
-
 /// Sometimes we need to construct logical plans that will produce errors
 /// when we try and create physical plan. A scan using `ExplodingTableProvider`
 /// will do the trick

Reply via email to