This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new f19c0fb8 Remove revive offer event loop (#156)
f19c0fb8 is described below
commit f19c0fb88edd5e3ac52ab3e5cdb6c5f87c63b2e7
Author: yahoNanJing <[email protected]>
AuthorDate: Thu Sep 1 20:59:39 2022 +0800
Remove revive offer event loop (#156)
* Add TaskUpdating event for QueryStageSchedulerEvent
* Remove revive offer event loop
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/src/event_loop.rs | 4 +
ballista/rust/scheduler/src/flight_sql.rs | 19 +-
.../rust/scheduler/src/scheduler_server/event.rs | 3 +
.../scheduler/src/scheduler_server/event_loop.rs | 415 --------------------
.../rust/scheduler/src/scheduler_server/grpc.rs | 56 +--
.../rust/scheduler/src/scheduler_server/mod.rs | 421 ++++++++-------------
.../src/scheduler_server/query_stage_scheduler.rs | 107 +++---
ballista/rust/scheduler/src/state/mod.rs | 409 +++++++++++++++++++-
ballista/rust/scheduler/src/state/task_manager.rs | 11 +-
ballista/rust/scheduler/src/test_utils.rs | 44 +--
10 files changed, 641 insertions(+), 848 deletions(-)
diff --git a/ballista/rust/core/src/event_loop.rs
b/ballista/rust/core/src/event_loop.rs
index 595bd33f..74ee4ebf 100644
--- a/ballista/rust/core/src/event_loop.rs
+++ b/ballista/rust/core/src/event_loop.rs
@@ -128,6 +128,10 @@ pub struct EventSender<E> {
}
impl<E> EventSender<E> {
+ pub fn new(tx_event: mpsc::Sender<E>) -> Self {
+ Self { tx_event }
+ }
+
pub async fn post_event(&self, event: E) -> Result<()> {
self.tx_event.send(event).await.map_err(|e| {
BallistaError::General(format!("Fail to send event due to {}", e))
diff --git a/ballista/rust/scheduler/src/flight_sql.rs
b/ballista/rust/scheduler/src/flight_sql.rs
index 08486fc2..ef98e0da 100644
--- a/ballista/rust/scheduler/src/flight_sql.rs
+++ b/ballista/rust/scheduler/src/flight_sql.rs
@@ -35,7 +35,6 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;
use tonic::{Response, Status, Streaming};
-use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SchedulerServer;
use arrow_flight::SchemaAsIpc;
use ballista_core::config::BallistaConfig;
@@ -243,22 +242,8 @@ impl FlightSqlServiceImpl {
plan: &LogicalPlan,
) -> Result<String, Status> {
let job_id = self.server.state.task_manager.generate_job_id();
- let query_stage_event_sender = self
- .server
- .query_stage_event_loop
- .get_sender()
- .map_err(|e| {
- Status::internal(format!(
- "Could not get query stage event sender due to: {}",
- e
- ))
- })?;
- query_stage_event_sender
- .post_event(QueryStageSchedulerEvent::JobQueued {
- job_id: job_id.clone(),
- session_ctx: ctx,
- plan: Box::new(plan.clone()),
- })
+ self.server
+ .submit_job(&job_id, ctx, plan)
.await
.map_err(|e| {
let msg =
diff --git a/ballista/rust/scheduler/src/scheduler_server/event.rs
b/ballista/rust/scheduler/src/scheduler_server/event.rs
index 0e43bfe2..10793ccc 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event.rs
@@ -19,6 +19,7 @@ use crate::state::executor_manager::ExecutorReservation;
use datafusion::logical_plan::LogicalPlan;
+use ballista_core::serde::protobuf::TaskStatus;
use datafusion::prelude::SessionContext;
use std::sync::Arc;
@@ -42,4 +43,6 @@ pub enum QueryStageSchedulerEvent {
// For a job fails with its execution graph setting failed
JobRunningFailed(String),
JobUpdated(String),
+ TaskUpdating(String, Vec<TaskStatus>),
+ ReservationOffering(Vec<ExecutorReservation>),
}
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
deleted file mode 100644
index d60d145e..00000000
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ /dev/null
@@ -1,415 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use log::{error, info};
-
-use crate::scheduler_server::event::SchedulerServerEvent;
-use ballista_core::error::{BallistaError, Result};
-use ballista_core::event_loop::EventAction;
-
-use ballista_core::serde::AsExecutionPlan;
-use datafusion_proto::logical_plan::AsLogicalPlan;
-use tokio::sync::mpsc;
-
-use crate::state::executor_manager::ExecutorReservation;
-use crate::state::SchedulerState;
-
-/// EventAction which will process `SchedulerServerEvent`s.
-/// In push-based scheduling, this is the primary mechanism for scheduling
tasks
-/// on executors.
-pub(crate) struct SchedulerServerEventAction<
- T: 'static + AsLogicalPlan,
- U: 'static + AsExecutionPlan,
-> {
- state: Arc<SchedulerState<T, U>>,
-}
-
-impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
- SchedulerServerEventAction<T, U>
-{
- pub fn new(state: Arc<SchedulerState<T, U>>) -> Self {
- Self { state }
- }
-
- /// Process reservations which are offered. The basic process is
- /// 1. Attempt to fill the offered reservations with available tasks
- /// 2. For any reservation that filled, launch the assigned task on the
executor.
- /// 3. For any reservations that could not be filled, cancel the
reservation (i.e. return the
- /// task slot back to the pool of available task slots).
- ///
- /// NOTE Error handling in this method is very important. No matter what
we need to ensure
- /// that unfilled reservations are cancelled or else they could become
permanently "invisible"
- /// to the scheduler.
- async fn offer_reservation(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<Option<SchedulerServerEvent>> {
- let (free_list, pending_tasks) = match self
- .state
- .task_manager
- .fill_reservations(&reservations)
- .await
- {
- Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
- for (executor_id, task) in assignments.into_iter() {
- match self
- .state
- .executor_manager
- .get_executor_metadata(&executor_id)
- .await
- {
- Ok(executor) => {
- if let Err(e) =
- self.state.task_manager.launch_task(&executor,
task).await
- {
- error!("Failed to launch new task: {:?}", e);
- unassigned_reservations.push(
-
ExecutorReservation::new_free(executor_id.clone()),
- );
- }
- }
- Err(e) => {
- error!("Failed to launch new task, could not get
executor metadata: {:?}", e);
- unassigned_reservations
-
.push(ExecutorReservation::new_free(executor_id.clone()));
- }
- }
- }
- (unassigned_reservations, pending_tasks)
- }
- Err(e) => {
- error!("Error filling reservations: {:?}", e);
- (reservations, 0)
- }
- };
-
- dbg!(free_list.clone());
- dbg!(pending_tasks);
- // If any reserved slots remain, return them to the pool
- if !free_list.is_empty() {
- self.state
- .executor_manager
- .cancel_reservations(free_list)
- .await?;
- Ok(None)
- } else if pending_tasks > 0 {
- // If there are pending tasks available, try and schedule them
- let new_reservations = self
- .state
- .executor_manager
- .reserve_slots(pending_tasks as u32)
- .await?;
- Ok(Some(SchedulerServerEvent::Offer(new_reservations)))
- } else {
- Ok(None)
- }
- }
-}
-
-#[async_trait]
-impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
- EventAction<SchedulerServerEvent> for SchedulerServerEventAction<T, U>
-{
- fn on_start(&self) {
- info!("Starting SchedulerServerEvent handler")
- }
-
- fn on_stop(&self) {
- info!("Stopping SchedulerServerEvent handler")
- }
-
- async fn on_receive(
- &self,
- event: SchedulerServerEvent,
- tx_event: &mpsc::Sender<SchedulerServerEvent>,
- _rx_event: &mpsc::Receiver<SchedulerServerEvent>,
- ) -> Result<()> {
- match event {
- SchedulerServerEvent::Offer(reservations) => {
- if let Some(event) =
self.offer_reservation(reservations).await? {
- tx_event.send(event).await.map_err(|e| {
- BallistaError::General(format!("Fail to send event due
to {}", e))
- })?;
- }
- }
- }
-
- Ok(())
- }
-
- fn on_error(&self, error: BallistaError) {
- error!("Error in SchedulerServerEvent handler: {:?}", error);
- }
-}
-
-#[cfg(test)]
-mod test {
- use crate::scheduler_server::event::SchedulerServerEvent;
- use crate::scheduler_server::event_loop::SchedulerServerEventAction;
- use crate::state::backend::standalone::StandaloneClient;
- use crate::state::SchedulerState;
- use ballista_core::config::{BallistaConfig,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
- use ballista_core::error::Result;
- use ballista_core::serde::protobuf::{
- task_status, CompletedTask, PartitionId, PhysicalPlanNode,
ShuffleWritePartition,
- TaskStatus,
- };
- use ballista_core::serde::scheduler::{
- ExecutorData, ExecutorMetadata, ExecutorSpecification,
- };
- use ballista_core::serde::BallistaCodec;
- use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::execution::context::default_session_builder;
- use datafusion::logical_expr::{col, sum};
- use datafusion::physical_plan::ExecutionPlan;
- use datafusion::prelude::SessionContext;
- use datafusion::test_util::scan_empty;
- use datafusion_proto::protobuf::LogicalPlanNode;
- use std::sync::Arc;
-
- // We should free any reservations which are not assigned
- #[tokio::test]
- async fn test_offer_free_reservations() -> Result<()> {
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_default_scheduler_name(
- state_storage,
- default_session_builder,
- BallistaCodec::default(),
- ));
-
- let executors = test_executors(1, 4);
-
- let (executor_metadata, executor_data) = executors[0].clone();
-
- let reservations = state
- .executor_manager
- .register_executor(executor_metadata, executor_data, true)
- .await?;
-
- let event_action =
Arc::new(SchedulerServerEventAction::new(state.clone()));
-
- let result = event_action.offer_reservation(reservations).await?;
-
- assert!(result.is_none());
-
- // All reservations should have been cancelled so we should be able to
reserve them now
- let reservations = state.executor_manager.reserve_slots(4).await?;
-
- assert_eq!(reservations.len(), 4);
-
- Ok(())
- }
-
- // We should fill unbound reservations to any available task
- #[tokio::test]
- async fn test_offer_fill_reservations() -> Result<()> {
- let config = BallistaConfig::builder()
- .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
- .build()?;
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_default_scheduler_name(
- state_storage,
- default_session_builder,
- BallistaCodec::default(),
- ));
-
- let session_ctx = state.session_manager.create_session(&config).await?;
-
- let plan = test_graph(session_ctx.clone()).await;
-
- // Create 4 jobs so we have four pending tasks
- state
- .task_manager
- .submit_job("job-1", session_ctx.session_id().as_str(),
plan.clone())
- .await?;
- state
- .task_manager
- .submit_job("job-2", session_ctx.session_id().as_str(),
plan.clone())
- .await?;
- state
- .task_manager
- .submit_job("job-3", session_ctx.session_id().as_str(),
plan.clone())
- .await?;
- state
- .task_manager
- .submit_job("job-4", session_ctx.session_id().as_str(),
plan.clone())
- .await?;
-
- let executors = test_executors(1, 4);
-
- let (executor_metadata, executor_data) = executors[0].clone();
-
- let reservations = state
- .executor_manager
- .register_executor(executor_metadata, executor_data, true)
- .await?;
-
- let event_action =
Arc::new(SchedulerServerEventAction::new(state.clone()));
-
- let result = event_action.offer_reservation(reservations).await?;
-
- assert!(result.is_none());
-
- // All task slots should be assigned so we should not be able to
reserve more tasks
- let reservations = state.executor_manager.reserve_slots(4).await?;
-
- assert_eq!(reservations.len(), 0);
-
- Ok(())
- }
-
- // We should generate a new event for tasks that are still pending
- #[tokio::test]
- async fn test_offer_resubmit_pending() -> Result<()> {
- let config = BallistaConfig::builder()
- .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
- .build()?;
- let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
- Arc::new(SchedulerState::new_with_default_scheduler_name(
- state_storage,
- default_session_builder,
- BallistaCodec::default(),
- ));
-
- let session_ctx = state.session_manager.create_session(&config).await?;
-
- let plan = test_graph(session_ctx.clone()).await;
-
- // Create a job
- state
- .task_manager
- .submit_job("job-1", session_ctx.session_id().as_str(),
plan.clone())
- .await?;
-
- let executors = test_executors(1, 4);
-
- let (executor_metadata, executor_data) = executors[0].clone();
-
- // Complete the first stage. So we should now have 4 pending tasks for
this job stage 2
- let mut partitions: Vec<ShuffleWritePartition> = vec![];
-
- for partition_id in 0..4 {
- partitions.push(ShuffleWritePartition {
- partition_id: partition_id as u64,
- path: "some/path".to_string(),
- num_batches: 1,
- num_rows: 1,
- num_bytes: 1,
- })
- }
-
- state
- .task_manager
- .update_task_statuses(
- &executor_metadata,
- vec![TaskStatus {
- task_id: Some(PartitionId {
- job_id: "job-1".to_string(),
- stage_id: 1,
- partition_id: 0,
- }),
- metrics: vec![],
- status: Some(task_status::Status::Completed(CompletedTask {
- executor_id: "executor-1".to_string(),
- partitions,
- })),
- }],
- )
- .await?;
-
- state
- .executor_manager
- .register_executor(executor_metadata, executor_data, false)
- .await?;
-
- let reservation = state.executor_manager.reserve_slots(1).await?;
-
- assert_eq!(reservation.len(), 1);
-
- let event_action =
Arc::new(SchedulerServerEventAction::new(state.clone()));
-
- // Offer the reservation. It should be filled with one of the 4
pending tasks. The other 3 should
- // be reserved for the other 3 tasks, emitting another offer event
- let result = event_action.offer_reservation(reservation).await?;
-
- assert!(result.is_some());
-
- match result {
- Some(SchedulerServerEvent::Offer(reservations)) => {
- assert_eq!(reservations.len(), 3)
- }
- _ => panic!("Expected 3 new reservations offered"),
- }
-
- // Remaining 3 task slots should be reserved for pending tasks
- let reservations = state.executor_manager.reserve_slots(4).await?;
-
- assert_eq!(reservations.len(), 0);
-
- Ok(())
- }
-
- fn test_executors(
- total_executors: usize,
- slots_per_executor: u32,
- ) -> Vec<(ExecutorMetadata, ExecutorData)> {
- let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
-
- for i in 0..total_executors {
- result.push((
- ExecutorMetadata {
- id: format!("executor-{}", i),
- host: format!("host-{}", i),
- port: 8080,
- grpc_port: 9090,
- specification: ExecutorSpecification {
- task_slots: slots_per_executor,
- },
- },
- ExecutorData {
- executor_id: format!("executor-{}", i),
- total_task_slots: slots_per_executor,
- available_task_slots: slots_per_executor,
- },
- ));
- }
-
- result
- }
-
- async fn test_graph(ctx: Arc<SessionContext>) -> Arc<dyn ExecutionPlan> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Utf8, false),
- Field::new("gmv", DataType::UInt64, false),
- ]);
-
- let plan = scan_empty(None, &schema, Some(vec![0, 1]))
- .unwrap()
- .aggregate(vec![col("id")], vec![sum(col("gmv"))])
- .unwrap()
- .build()
- .unwrap();
-
- ctx.create_physical_plan(&plan).await.unwrap()
- }
-}
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 80a9defc..0937158b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -16,7 +16,6 @@
// under the License.
use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy};
-
use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId,
Query};
use ballista_core::serde::protobuf::executor_registration::OptionalHost;
@@ -48,7 +47,6 @@ use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};
-use crate::scheduler_server::event::{QueryStageSchedulerEvent,
SchedulerServerEvent};
use crate::scheduler_server::SchedulerServer;
use crate::state::executor_manager::ExecutorReservation;
@@ -189,30 +187,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
available_task_slots: metadata.specification.task_slots,
};
- if let Ok(Some(sender)) =
- self.event_loop.as_ref().map(|e| e.get_sender()).transpose()
- {
- // If we are using push-based scheduling then reserve this
executors slots and send
- // them for scheduling tasks.
+ async {
+ // Save the executor to state
let reservations = self
.state
- .executor_manager
- .register_executor(metadata, executor_data, true)
- .await
- .unwrap();
-
- sender
- .post_event(SchedulerServerEvent::Offer(reservations))
- .await
- .unwrap();
- } else {
- // Otherwise just save the executor to state
- self.state
.executor_manager
.register_executor(metadata, executor_data, false)
- .await
- .unwrap();
+ .await?;
+
+ // If we are using push-based scheduling then reserve this
executors slots and send
+ // them for scheduling tasks.
+ if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
+ self.offer_reservation(reservations).await?;
+ }
+
+ Ok::<(), ballista_core::error::BallistaError>(())
}
+ .await
+ .map_err(|e| {
+ let msg = format!("Fail to do executor registration due to:
{}", e);
+ error!("{}", msg);
+ Status::internal(msg)
+ })?;
Ok(Response::new(RegisterExecutorResult { success: true }))
} else {
@@ -414,20 +410,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let job_id = self.state.task_manager.generate_job_id();
- let query_stage_event_sender =
- self.query_stage_event_loop.get_sender().map_err(|e| {
- Status::internal(format!(
- "Could not get query stage event sender due to: {}",
- e
- ))
- })?;
-
- query_stage_event_sender
- .post_event(QueryStageSchedulerEvent::JobQueued {
- job_id: job_id.clone(),
- session_ctx,
- plan: Box::new(plan),
- })
+ self.submit_job(&job_id, session_ctx, &plan)
.await
.map_err(|e| {
let msg =
@@ -535,12 +518,13 @@ mod test {
#[tokio::test]
async fn test_poll_work() -> Result<(), BallistaError> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
+ let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
);
+ scheduler.init().await?;
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
optional_host:
Some(OptionalHost::Host("http://localhost:8080".to_owned())),
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs
b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index df376c4c..19ce8680 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -20,18 +20,15 @@ use std::time::{SystemTime, UNIX_EPOCH};
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
-use ballista_core::event_loop::{EventAction, EventLoop};
+use ballista_core::event_loop::EventLoop;
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::execution::context::{default_session_builder, SessionState};
-
-use datafusion::prelude::SessionConfig;
+use datafusion::logical_plan::LogicalPlan;
+use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::logical_plan::AsLogicalPlan;
-use log::error;
-
-use crate::scheduler_server::event::{QueryStageSchedulerEvent,
SchedulerServerEvent};
-use crate::scheduler_server::event_loop::SchedulerServerEventAction;
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
use crate::state::backend::StateBackendClient;
use crate::state::executor_manager::ExecutorReservation;
@@ -44,7 +41,6 @@ pub mod externalscaler {
}
pub mod event;
-mod event_loop;
mod external_scaler;
mod grpc;
mod query_stage_scheduler;
@@ -57,7 +53,6 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
policy: TaskSchedulingPolicy,
- event_loop: Option<EventLoop<SchedulerServerEvent>>,
pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
}
@@ -105,30 +100,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name.clone(),
));
- let event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>> =
- if matches!(policy, TaskSchedulingPolicy::PushStaged) {
- Some(Arc::new(SchedulerServerEventAction::new(state.clone())))
- } else {
- None
- };
- SchedulerServer::new_with_event_action(scheduler_name, state,
event_action)
+ SchedulerServer::new_with_state(scheduler_name, policy, state)
}
- fn new_with_event_action(
+ pub(crate) fn new_with_state(
scheduler_name: String,
+ policy: TaskSchedulingPolicy,
state: Arc<SchedulerState<T, U>>,
- event_action: Option<Arc<dyn EventAction<SchedulerServerEvent>>>,
) -> Self {
- let event_loop = event_action.map(|event_action| {
- EventLoop::new("scheduler".to_owned(), 10000, event_action)
- });
- let policy = if event_loop.is_some() {
- TaskSchedulingPolicy::PushStaged
- } else {
- TaskSchedulingPolicy::PullStaged
- };
let query_stage_scheduler =
- Arc::new(QueryStageScheduler::new(state.clone(), None));
+ Arc::new(QueryStageScheduler::new(state.clone(), policy));
let query_stage_event_loop =
EventLoop::new("query_stage".to_owned(), 10000,
query_stage_scheduler);
Self {
@@ -139,97 +120,56 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
.unwrap()
.as_millis(),
policy,
- event_loop,
query_stage_event_loop,
}
}
pub async fn init(&mut self) -> Result<()> {
- {
- // initialize state
- self.state.init().await?;
- }
-
- {
- if let Some(event_loop) = self.event_loop.as_mut() {
- event_loop.start()?;
-
- let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
- self.state.clone(),
- Some(event_loop.get_sender()?),
- ));
- let query_stage_event_loop = EventLoop::new(
- self.query_stage_event_loop.name.clone(),
- self.query_stage_event_loop.buffer_size,
- query_stage_scheduler,
- );
- self.query_stage_event_loop = query_stage_event_loop;
- }
-
- self.query_stage_event_loop.start()?;
- }
+ self.state.init().await?;
+ self.query_stage_event_loop.start()?;
Ok(())
}
+ pub(crate) async fn submit_job(
+ &self,
+ job_id: &str,
+ ctx: Arc<SessionContext>,
+ plan: &LogicalPlan,
+ ) -> Result<()> {
+ self.query_stage_event_loop
+ .get_sender()?
+ .post_event(QueryStageSchedulerEvent::JobQueued {
+ job_id: job_id.to_owned(),
+ session_ctx: ctx,
+ plan: Box::new(plan.clone()),
+ })
+ .await
+ }
+
+ /// It just send task status update event to the channel,
+ /// and will not guarantee the event processing completed after return
pub(crate) async fn update_task_status(
&self,
executor_id: &str,
tasks_status: Vec<TaskStatus>,
) -> Result<()> {
- let num_status = tasks_status.len();
- let executor = self
- .state
- .executor_manager
- .get_executor_metadata(executor_id)
- .await?;
-
- match self
- .state
- .task_manager
- .update_task_statuses(&executor, tasks_status)
+ self.query_stage_event_loop
+ .get_sender()?
+ .post_event(QueryStageSchedulerEvent::TaskUpdating(
+ executor_id.to_owned(),
+ tasks_status,
+ ))
.await
- {
- Ok((stage_events, offers)) => {
- if let Some(event_loop) = self.event_loop.as_ref() {
- event_loop
- .get_sender()?
- .post_event(SchedulerServerEvent::Offer(offers))
- .await?;
- }
-
- for stage_event in stage_events {
- self.post_stage_event(stage_event).await?;
- }
- }
- Err(e) => {
- error!(
- "Failed to update {} task statuses for executor {}: {:?}",
- num_status, executor_id, e
- );
- // In case task update fails, make sure to free reservations
- if let Some(event_loop) = self.event_loop.as_ref() {
- let mut reservations = vec![];
- for _ in 0..num_status {
- reservations
-
.push(ExecutorReservation::new_free(executor_id.to_owned()));
- }
-
- event_loop
- .get_sender()?
- .post_event(SchedulerServerEvent::Offer(reservations))
- .await?;
- }
- }
- }
-
- Ok(())
}
- async fn post_stage_event(&self, event: QueryStageSchedulerEvent) ->
Result<()> {
+ pub(crate) async fn offer_reservation(
+ &self,
+ reservations: Vec<ExecutorReservation>,
+ ) -> Result<()> {
self.query_stage_event_loop
.get_sender()?
- .post_event(event)
+
.post_event(QueryStageSchedulerEvent::ReservationOffering(reservations))
.await
}
}
@@ -249,8 +189,7 @@ mod test {
use ballista_core::config::{
BallistaConfig, TaskSchedulingPolicy,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
};
- use ballista_core::error::{BallistaError, Result};
- use ballista_core::event_loop::EventAction;
+ use ballista_core::error::Result;
use ballista_core::serde::protobuf::{
job_status, task_status, CompletedTask, FailedTask, JobStatus,
PartitionId,
@@ -261,17 +200,12 @@ mod test {
};
use ballista_core::serde::BallistaCodec;
- use crate::scheduler_server::event::{
- QueryStageSchedulerEvent, SchedulerServerEvent,
- };
use crate::scheduler_server::SchedulerServer;
use crate::state::backend::standalone::StandaloneClient;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::SchedulerState;
- use crate::test_utils::{
- await_condition, ExplodingTableProvider, SchedulerEventObserver,
- };
+ use crate::test_utils::{await_condition, ExplodingTableProvider};
#[tokio::test]
async fn test_pull_scheduling() -> Result<()> {
@@ -297,33 +231,12 @@ mod test {
.create_session(&config)
.await?;
- let plan = async {
- let optimized_plan = ctx.optimize(&plan).map_err(|e| {
- BallistaError::General(format!(
- "Could not create optimized logical plan: {}",
- e
- ))
- })?;
-
- ctx.create_physical_plan(&optimized_plan)
- .await
- .map_err(|e| {
- BallistaError::General(format!(
- "Could not create physical plan: {}",
- e
- ))
- })
- }
- .await?;
-
let job_id = "job";
- let session_id = ctx.session_id();
// Submit job
scheduler
.state
- .task_manager
- .submit_job(job_id, &session_id, plan)
+ .submit_job(job_id, ctx, &plan)
.await
.expect("submitting plan");
@@ -371,7 +284,8 @@ mod test {
};
scheduler
- .update_task_status("executor-1", vec![task_status])
+ .state
+ .update_task_statuses("executor-1", vec![task_status])
.await?;
} else {
break;
@@ -405,13 +319,7 @@ mod test {
let plan = test_plan();
let task_slots = 4;
- let (sender, mut event_receiver) =
- tokio::sync::mpsc::channel::<SchedulerServerEvent>(1000);
- let (error_sender, _) =
tokio::sync::mpsc::channel::<BallistaError>(1000);
-
- let event_action = SchedulerEventObserver::new(sender, error_sender);
-
- let scheduler =
test_scheduler_with_event_action(Arc::new(event_action)).await?;
+ let scheduler = test_push_staged_scheduler().await?;
let executors = test_executors(task_slots);
for (executor_metadata, executor_data) in executors {
@@ -432,21 +340,39 @@ mod test {
let job_id = "job";
- // Send JobQueued event to kick off the event loop
- scheduler
- .query_stage_event_loop
- .get_sender()?
- .post_event(QueryStageSchedulerEvent::JobQueued {
- job_id: job_id.to_owned(),
- session_ctx: ctx,
- plan: Box::new(plan),
- })
- .await?;
+ scheduler.state.submit_job(job_id, ctx, &plan).await?;
// Complete tasks that are offered through scheduler events
- while let Some(SchedulerServerEvent::Offer(reservations)) =
- event_receiver.recv().await
- {
+ loop {
+ // Check condition
+ let available_tasks = {
+ let graph = scheduler
+ .state
+ .task_manager
+ .get_active_execution_graph(job_id)
+ .await
+ .unwrap();
+ let graph = graph.read().await;
+ if graph.complete() {
+ break;
+ }
+ graph.available_tasks()
+ };
+
+ if available_tasks == 0 {
+ tokio::time::sleep(Duration::from_millis(5)).await;
+ continue;
+ }
+
+ let reservations: Vec<ExecutorReservation> = scheduler
+ .state
+ .executor_manager
+ .reserve_slots(available_tasks as u32)
+ .await?
+ .into_iter()
+ .map(|res| res.assign(job_id.to_owned()))
+ .collect();
+
let free_list = match scheduler
.state
.task_manager
@@ -454,11 +380,6 @@ mod test {
.await
{
Ok((assignments, mut unassigned_reservations, _)) => {
- // Break when we are no longer assigning tasks
- if unassigned_reservations.len() == reservations.len() {
- break;
- }
-
for (executor_id, task) in assignments.into_iter() {
match scheduler
.state
@@ -544,13 +465,7 @@ mod test {
let plan = test_plan();
let task_slots = 4;
- let (sender, mut event_receiver) =
- tokio::sync::mpsc::channel::<SchedulerServerEvent>(1000);
- let (error_sender, _) =
tokio::sync::mpsc::channel::<BallistaError>(1000);
-
- let event_action = SchedulerEventObserver::new(sender, error_sender);
-
- let scheduler =
test_scheduler_with_event_action(Arc::new(event_action)).await?;
+ let scheduler = test_push_staged_scheduler().await?;
let executors = test_executors(task_slots);
for (executor_metadata, executor_data) in executors {
@@ -571,94 +486,92 @@ mod test {
let job_id = "job";
- // Send JobQueued event to kick off the event loop
- scheduler
- .query_stage_event_loop
- .get_sender()?
- .post_event(QueryStageSchedulerEvent::JobQueued {
- job_id: job_id.to_owned(),
- session_ctx: ctx,
- plan: Box::new(plan),
- })
+ scheduler.state.submit_job(job_id, ctx, &plan).await?;
+
+ let available_tasks = scheduler
+ .state
+ .task_manager
+ .get_available_task_count(job_id)
.await?;
+ let reservations: Vec<ExecutorReservation> = scheduler
+ .state
+ .executor_manager
+ .reserve_slots(available_tasks as u32)
+ .await?
+ .into_iter()
+ .map(|res| res.assign(job_id.to_owned()))
+ .collect();
+
// Complete tasks that are offered through scheduler events
- if let Some(SchedulerServerEvent::Offer(reservations)) =
- event_receiver.recv().await
+ let free_list = match scheduler
+ .state
+ .task_manager
+ .fill_reservations(&reservations)
+ .await
{
- let free_list = match scheduler
- .state
- .task_manager
- .fill_reservations(&reservations)
- .await
- {
- Ok((assignments, mut unassigned_reservations, _)) => {
- for (executor_id, task) in assignments.into_iter() {
- match scheduler
- .state
- .executor_manager
- .get_executor_metadata(&executor_id)
- .await
- {
- Ok(executor) => {
- let mut partitions: Vec<ShuffleWritePartition>
= vec![];
-
- let num_partitions = task
- .output_partitioning
- .map(|p| p.partition_count())
- .unwrap_or(1);
-
- for partition_id in 0..num_partitions {
- partitions.push(ShuffleWritePartition {
- partition_id: partition_id as u64,
- path: "some/path".to_string(),
- num_batches: 1,
- num_rows: 1,
- num_bytes: 1,
- })
- }
-
- // Complete the task
- let task_status = TaskStatus {
- status: Some(task_status::Status::Failed(
- FailedTask {
- error: "".to_string(),
- },
- )),
- metrics: vec![],
- task_id: Some(PartitionId {
- job_id: job_id.to_owned(),
- stage_id: task.partition.stage_id as
u32,
- partition_id:
task.partition.partition_id as u32,
- }),
- };
-
- scheduler
- .update_task_status(&executor.id,
vec![task_status])
- .await?;
- }
- Err(_e) => {
- unassigned_reservations.push(
-
ExecutorReservation::new_free(executor_id.clone()),
- );
+ Ok((assignments, mut unassigned_reservations, _)) => {
+ for (executor_id, task) in assignments.into_iter() {
+ match scheduler
+ .state
+ .executor_manager
+ .get_executor_metadata(&executor_id)
+ .await
+ {
+ Ok(executor) => {
+ let mut partitions: Vec<ShuffleWritePartition> =
vec![];
+
+ let num_partitions = task
+ .output_partitioning
+ .map(|p| p.partition_count())
+ .unwrap_or(1);
+
+ for partition_id in 0..num_partitions {
+ partitions.push(ShuffleWritePartition {
+ partition_id: partition_id as u64,
+ path: "some/path".to_string(),
+ num_batches: 1,
+ num_rows: 1,
+ num_bytes: 1,
+ })
}
+
+ // Complete the task
+ let task_status = TaskStatus {
+ status:
Some(task_status::Status::Failed(FailedTask {
+ error: "".to_string(),
+ })),
+ metrics: vec![],
+ task_id: Some(PartitionId {
+ job_id: job_id.to_owned(),
+ stage_id: task.partition.stage_id as u32,
+ partition_id: task.partition.partition_id
as u32,
+ }),
+ };
+
+ scheduler
+ .state
+ .update_task_statuses(&executor.id,
vec![task_status])
+ .await?;
+ }
+ Err(_e) => {
+ unassigned_reservations
+
.push(ExecutorReservation::new_free(executor_id.clone()));
}
}
- unassigned_reservations
}
- Err(_e) => reservations,
- };
-
- // If any reserved slots remain, return them to the pool
- if !free_list.is_empty() {
- scheduler
- .state
- .executor_manager
- .cancel_reservations(free_list)
- .await?;
+ unassigned_reservations
}
- } else {
- panic!("No reservations offered");
+ Err(_e) => reservations,
+ };
+
+ // If any reserved slots remain, return them to the pool
+ if !free_list.is_empty() {
+ scheduler
+ .state
+ .executor_manager
+ .cancel_reservations(free_list)
+ .await?;
}
let status =
scheduler.state.task_manager.get_job_status(job_id).await?;
@@ -682,13 +595,7 @@ mod test {
async fn test_planning_failure() -> Result<()> {
let task_slots = 4;
- let (sender, _event_receiver) =
- tokio::sync::mpsc::channel::<SchedulerServerEvent>(1000);
- let (error_sender, _) =
tokio::sync::mpsc::channel::<BallistaError>(1000);
-
- let event_action = SchedulerEventObserver::new(sender, error_sender);
-
- let scheduler =
test_scheduler_with_event_action(Arc::new(event_action)).await?;
+ let scheduler = test_push_staged_scheduler().await?;
let config = test_session(task_slots);
@@ -704,17 +611,8 @@ mod test {
let job_id = "job";
- // Send JobQueued event to kick off the event loop
// This should fail when we try and create the physical plan
- scheduler
- .query_stage_event_loop
- .get_sender()?
- .post_event(QueryStageSchedulerEvent::JobQueued {
- job_id: job_id.to_owned(),
- session_ctx: ctx,
- plan: Box::new(plan),
- })
- .await?;
+ scheduler.submit_job(job_id, ctx, &plan).await?;
let scheduler = scheduler.clone();
@@ -754,8 +652,7 @@ mod test {
Ok(scheduler)
}
- async fn test_scheduler_with_event_action(
- event_action: Arc<dyn EventAction<SchedulerServerEvent>>,
+ async fn test_push_staged_scheduler(
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let state = Arc::new(SchedulerState::new_with_default_scheduler_name(
@@ -764,10 +661,10 @@ mod test {
BallistaCodec::default(),
));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
- SchedulerServer::new_with_event_action(
+ SchedulerServer::new_with_state(
"localhost:50050".to_owned(),
+ TaskSchedulingPolicy::PushStaged,
state,
- Some(event_action),
);
scheduler.init().await?;
diff --git
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 1c403ea6..53fe38f5 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -16,21 +16,19 @@
// under the License.
use std::sync::Arc;
-use std::time::Instant;
use async_trait::async_trait;
-use datafusion::logical_plan::LogicalPlan;
-use datafusion::prelude::SessionContext;
use log::{debug, error, info};
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::{EventAction, EventSender};
+use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::AsExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use tokio::sync::mpsc;
-use crate::scheduler_server::event::{QueryStageSchedulerEvent,
SchedulerServerEvent};
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::SchedulerState;
@@ -40,18 +38,15 @@ pub(crate) struct QueryStageScheduler<
U: 'static + AsExecutionPlan,
> {
state: Arc<SchedulerState<T, U>>,
- event_sender: Option<EventSender<SchedulerServerEvent>>,
+ policy: TaskSchedulingPolicy,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
QueryStageScheduler<T, U> {
pub(crate) fn new(
state: Arc<SchedulerState<T, U>>,
- event_sender: Option<EventSender<SchedulerServerEvent>>,
+ policy: TaskSchedulingPolicy,
) -> Self {
- Self {
- state,
- event_sender,
- }
+ Self { state, policy }
}
}
@@ -73,6 +68,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
tx_event: &mpsc::Sender<QueryStageSchedulerEvent>,
_rx_event: &mpsc::Receiver<QueryStageSchedulerEvent>,
) -> Result<()> {
+ let tx_event = EventSender::new(tx_event.clone());
match event {
QueryStageSchedulerEvent::JobQueued {
job_id,
@@ -81,11 +77,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
} => {
info!("Job {} queued", job_id);
let state = self.state.clone();
- let tx_event = tx_event.clone();
tokio::spawn(async move {
let event = if let Err(e) =
- submit_job(state.clone(), job_id.clone(), session_ctx,
&plan)
- .await
+ state.submit_job(&job_id, session_ctx, &plan).await
{
let msg = format!("Error planning job {}: {:?}",
job_id, e);
error!("{}", &msg);
@@ -94,17 +88,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
QueryStageSchedulerEvent::JobSubmitted(job_id)
};
tx_event
- .send(event)
+ .post_event(event)
.await
- .map_err(|e| {
- error!("Fail to send event due to {}", e);
- })
+ .map_err(|e| error!("Fail to send event due to {}", e))
.unwrap();
});
}
QueryStageSchedulerEvent::JobSubmitted(job_id) => {
info!("Job {} submitted", job_id);
- if let Some(sender) = &self.event_sender {
+ if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
let available_tasks = self
.state
.task_manager
@@ -126,16 +118,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
job_id
);
- if let Err(e) = sender
-
.post_event(SchedulerServerEvent::Offer(reservations.clone()))
- .await
- {
- error!("Error posting offer: {:?}", e);
- self.state
- .executor_manager
- .cancel_reservations(reservations)
- .await?;
- }
+ tx_event
+
.post_event(QueryStageSchedulerEvent::ReservationOffering(
+ reservations,
+ ))
+ .await?;
}
}
QueryStageSchedulerEvent::JobPlanningFailed(job_id, fail_message)
=> {
@@ -157,6 +144,45 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
error!("Job {} Updated", job_id);
self.state.task_manager.update_job(&job_id).await?;
}
+ QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status)
=> {
+ let num_status = tasks_status.len();
+ match self
+ .state
+ .update_task_statuses(&executor_id, tasks_status)
+ .await
+ {
+ Ok((stage_events, offers)) => {
+ if matches!(self.policy,
TaskSchedulingPolicy::PushStaged) {
+ tx_event
+ .post_event(
+
QueryStageSchedulerEvent::ReservationOffering(offers),
+ )
+ .await?;
+ }
+
+ for stage_event in stage_events {
+ tx_event.post_event(stage_event).await?;
+ }
+ }
+ Err(e) => {
+ error!(
+ "Failed to update {} task statuses for executor
{}: {:?}",
+ num_status, executor_id, e
+ );
+ // TODO error handling
+ }
+ }
+ }
+ QueryStageSchedulerEvent::ReservationOffering(reservations) => {
+ let reservations =
self.state.offer_reservation(reservations).await?;
+ if !reservations.is_empty() {
+ tx_event
+
.post_event(QueryStageSchedulerEvent::ReservationOffering(
+ reservations,
+ ))
+ .await?;
+ }
+ }
}
Ok(())
@@ -166,28 +192,3 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
error!("Error received by QueryStageScheduler: {:?}", error);
}
}
-
-async fn submit_job<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
- state: Arc<SchedulerState<T, U>>,
- job_id: String,
- session_ctx: Arc<SessionContext>,
- plan: &LogicalPlan,
-) -> Result<()> {
- let start = Instant::now();
- let optimized_plan = session_ctx.optimize(plan)?;
-
- debug!("Calculated optimized plan: {:?}", optimized_plan);
-
- let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
-
- state
- .task_manager
- .submit_job(&job_id, &session_ctx.session_id(), plan.clone())
- .await?;
-
- let elapsed = start.elapsed();
-
- info!("Planned job {} in {:?}", job_id, elapsed);
-
- Ok(())
-}
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index 5668c6b1..43cb25e6 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -17,24 +17,25 @@
use std::any::type_name;
use std::future::Future;
-
use std::sync::Arc;
+use std::time::Instant;
-use prost::Message;
-
-use ballista_core::error::{BallistaError, Result};
-
+use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SessionBuilder;
-
-use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion_proto::logical_plan::AsLogicalPlan;
-
use crate::state::backend::{Lock, StateBackendClient};
-
-use crate::state::executor_manager::ExecutorManager;
+use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::session_manager::SessionManager;
use crate::state::task_manager::TaskManager;
+use ballista_core::error::{BallistaError, Result};
+use ballista_core::serde::protobuf::TaskStatus;
+use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use datafusion::logical_plan::LogicalPlan;
+use datafusion::prelude::SessionContext;
+use datafusion_proto::logical_plan::AsLogicalPlan;
+use log::{debug, error, info};
+use prost::Message;
+
pub mod backend;
pub mod execution_graph;
pub mod executor_manager;
@@ -122,6 +123,151 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
pub async fn init(&self) -> Result<()> {
self.executor_manager.init().await
}
+
+ #[cfg(not(test))]
+ pub(crate) async fn update_task_statuses(
+ &self,
+ executor_id: &str,
+ tasks_status: Vec<TaskStatus>,
+ ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
+ let executor = self
+ .executor_manager
+ .get_executor_metadata(executor_id)
+ .await?;
+
+ let total_num_tasks = tasks_status.len();
+ let reservations = (0..total_num_tasks)
+ .into_iter()
+ .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
+ .collect();
+
+ let events = self
+ .task_manager
+ .update_task_statuses(&executor, tasks_status)
+ .await?;
+
+ Ok((events, reservations))
+ }
+
+ #[cfg(test)]
+ pub(crate) async fn update_task_statuses(
+ &self,
+ executor_id: &str,
+ tasks_status: Vec<TaskStatus>,
+ ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
+ let executor = self
+ .executor_manager
+ .get_executor_metadata(executor_id)
+ .await?;
+
+ let total_num_tasks = tasks_status.len();
+ let free_list = (0..total_num_tasks)
+ .into_iter()
+ .map(|_| ExecutorReservation::new_free(executor_id.to_owned()))
+ .collect();
+
+ let events = self
+ .task_manager
+ .update_task_statuses(&executor, tasks_status)
+ .await?;
+
+ self.executor_manager.cancel_reservations(free_list).await?;
+
+ Ok((events, vec![]))
+ }
+
+ /// Process reservations which are offered. The basic process is
+ /// 1. Attempt to fill the offered reservations with available tasks
+ /// 2. For any reservation that filled, launch the assigned task on the
executor.
+ /// 3. For any reservations that could not be filled, cancel the
reservation (i.e. return the
+ /// task slot back to the pool of available task slots).
+ ///
+ /// NOTE Error handling in this method is very important. No matter what
we need to ensure
+ /// that unfilled reservations are cancelled or else they could become
permanently "invisible"
+ /// to the scheduler.
+ pub(crate) async fn offer_reservation(
+ &self,
+ reservations: Vec<ExecutorReservation>,
+ ) -> Result<Vec<ExecutorReservation>> {
+ let (free_list, pending_tasks) = match self
+ .task_manager
+ .fill_reservations(&reservations)
+ .await
+ {
+ Ok((assignments, mut unassigned_reservations, pending_tasks)) => {
+ for (executor_id, task) in assignments.into_iter() {
+ match self
+ .executor_manager
+ .get_executor_metadata(&executor_id)
+ .await
+ {
+ Ok(executor) => {
+ if let Err(e) =
+ self.task_manager.launch_task(&executor,
task).await
+ {
+ error!("Failed to launch new task: {:?}", e);
+ unassigned_reservations.push(
+
ExecutorReservation::new_free(executor_id.clone()),
+ );
+ }
+ }
+ Err(e) => {
+ error!("Failed to launch new task, could not get
executor metadata: {:?}", e);
+ unassigned_reservations
+
.push(ExecutorReservation::new_free(executor_id.clone()));
+ }
+ }
+ }
+ (unassigned_reservations, pending_tasks)
+ }
+ Err(e) => {
+ error!("Error filling reservations: {:?}", e);
+ (reservations, 0)
+ }
+ };
+
+ dbg!(free_list.clone());
+ dbg!(pending_tasks);
+
+ let mut new_reservations = vec![];
+ if !free_list.is_empty() {
+ // If any reserved slots remain, return them to the pool
+ self.executor_manager.cancel_reservations(free_list).await?;
+ } else if pending_tasks > 0 {
+ // If there are pending tasks available, try and schedule them
+ let pending_reservations = self
+ .executor_manager
+ .reserve_slots(pending_tasks as u32)
+ .await?;
+ new_reservations.extend(pending_reservations);
+ }
+
+ Ok(new_reservations)
+ }
+
+ pub(crate) async fn submit_job(
+ &self,
+ job_id: &str,
+ session_ctx: Arc<SessionContext>,
+ plan: &LogicalPlan,
+ ) -> Result<()> {
+ let start = Instant::now();
+ let optimized_plan = session_ctx.optimize(plan)?;
+
+ debug!("Calculated optimized plan: {:?}", optimized_plan);
+
+ let plan = session_ctx.create_physical_plan(&optimized_plan).await?;
+
+ self.task_manager
+ .submit_job(job_id, &session_ctx.session_id(), plan)
+ .await?;
+
+ let elapsed = start.elapsed();
+
+ info!("Planned job {} in {:?}", job_id, elapsed);
+
+ Ok(())
+ }
}
pub async fn with_lock<Out, F: Future<Output = Out>>(lock: Box<dyn Lock>, op:
F) -> Out {
@@ -132,5 +278,242 @@ pub async fn with_lock<Out, F: Future<Output =
Out>>(lock: Box<dyn Lock>, op: F)
result
}
-#[cfg(all(test, feature = "sled"))]
-mod test {}
+#[cfg(test)]
+mod test {
+ use crate::state::backend::standalone::StandaloneClient;
+ use crate::state::SchedulerState;
+ use ballista_core::config::{BallistaConfig,
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS};
+ use ballista_core::error::Result;
+ use ballista_core::serde::protobuf::{
+ task_status, CompletedTask, PartitionId, PhysicalPlanNode,
ShuffleWritePartition,
+ TaskStatus,
+ };
+ use ballista_core::serde::scheduler::{
+ ExecutorData, ExecutorMetadata, ExecutorSpecification,
+ };
+ use ballista_core::serde::BallistaCodec;
+ use datafusion::arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::execution::context::default_session_builder;
+ use datafusion::logical_expr::{col, sum};
+ use datafusion::physical_plan::ExecutionPlan;
+ use datafusion::prelude::SessionContext;
+ use datafusion::test_util::scan_empty;
+ use datafusion_proto::protobuf::LogicalPlanNode;
+ use std::sync::Arc;
+
+ // We should free any reservations which are not assigned
+ #[tokio::test]
+ async fn test_offer_free_reservations() -> Result<()> {
+ let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
+ Arc::new(SchedulerState::new_with_default_scheduler_name(
+ state_storage,
+ default_session_builder,
+ BallistaCodec::default(),
+ ));
+
+ let executors = test_executors(1, 4);
+
+ let (executor_metadata, executor_data) = executors[0].clone();
+
+ let reservations = state
+ .executor_manager
+ .register_executor(executor_metadata, executor_data, true)
+ .await?;
+
+ let result = state.offer_reservation(reservations).await?;
+
+ assert!(result.is_empty());
+
+ // All reservations should have been cancelled so we should be able to
reserve them now
+ let reservations = state.executor_manager.reserve_slots(4).await?;
+
+ assert_eq!(reservations.len(), 4);
+
+ Ok(())
+ }
+
+ // We should fill unbound reservations to any available task
+ #[tokio::test]
+ async fn test_offer_fill_reservations() -> Result<()> {
+ let config = BallistaConfig::builder()
+ .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
+ .build()?;
+ let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
+ Arc::new(SchedulerState::new_with_default_scheduler_name(
+ state_storage,
+ default_session_builder,
+ BallistaCodec::default(),
+ ));
+
+ let session_ctx = state.session_manager.create_session(&config).await?;
+
+ let plan = test_graph(session_ctx.clone()).await;
+
+ // Create 4 jobs so we have four pending tasks
+ state
+ .task_manager
+ .submit_job("job-1", session_ctx.session_id().as_str(),
plan.clone())
+ .await?;
+ state
+ .task_manager
+ .submit_job("job-2", session_ctx.session_id().as_str(),
plan.clone())
+ .await?;
+ state
+ .task_manager
+ .submit_job("job-3", session_ctx.session_id().as_str(),
plan.clone())
+ .await?;
+ state
+ .task_manager
+ .submit_job("job-4", session_ctx.session_id().as_str(),
plan.clone())
+ .await?;
+
+ let executors = test_executors(1, 4);
+
+ let (executor_metadata, executor_data) = executors[0].clone();
+
+ let reservations = state
+ .executor_manager
+ .register_executor(executor_metadata, executor_data, true)
+ .await?;
+
+ let result = state.offer_reservation(reservations).await?;
+
+ assert!(result.is_empty());
+
+ // All task slots should be assigned so we should not be able to
reserve more tasks
+ let reservations = state.executor_manager.reserve_slots(4).await?;
+
+ assert_eq!(reservations.len(), 0);
+
+ Ok(())
+ }
+
+ // We should generate a new event for tasks that are still pending
+ #[tokio::test]
+ async fn test_offer_resubmit_pending() -> Result<()> {
+ let config = BallistaConfig::builder()
+ .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
+ .build()?;
+ let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
+ let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
+ Arc::new(SchedulerState::new_with_default_scheduler_name(
+ state_storage,
+ default_session_builder,
+ BallistaCodec::default(),
+ ));
+
+ let session_ctx = state.session_manager.create_session(&config).await?;
+
+ let plan = test_graph(session_ctx.clone()).await;
+
+ // Create a job
+ state
+ .task_manager
+ .submit_job("job-1", session_ctx.session_id().as_str(),
plan.clone())
+ .await?;
+
+ let executors = test_executors(1, 4);
+
+ let (executor_metadata, executor_data) = executors[0].clone();
+
+ // Complete the first stage. So we should now have 4 pending tasks for
this job stage 2
+ let mut partitions: Vec<ShuffleWritePartition> = vec![];
+
+ for partition_id in 0..4 {
+ partitions.push(ShuffleWritePartition {
+ partition_id: partition_id as u64,
+ path: "some/path".to_string(),
+ num_batches: 1,
+ num_rows: 1,
+ num_bytes: 1,
+ })
+ }
+
+ state
+ .task_manager
+ .update_task_statuses(
+ &executor_metadata,
+ vec![TaskStatus {
+ task_id: Some(PartitionId {
+ job_id: "job-1".to_string(),
+ stage_id: 1,
+ partition_id: 0,
+ }),
+ metrics: vec![],
+ status: Some(task_status::Status::Completed(CompletedTask {
+ executor_id: "executor-1".to_string(),
+ partitions,
+ })),
+ }],
+ )
+ .await?;
+
+ state
+ .executor_manager
+ .register_executor(executor_metadata, executor_data, false)
+ .await?;
+
+ let reservations = state.executor_manager.reserve_slots(1).await?;
+
+ assert_eq!(reservations.len(), 1);
+
+ // Offer the reservation. It should be filled with one of the 4
pending tasks. The other 3 should
+ // be reserved for the other 3 tasks, emitting another offer event
+ let reservations = state.offer_reservation(reservations).await?;
+
+ assert_eq!(reservations.len(), 3);
+
+ // Remaining 3 task slots should be reserved for pending tasks
+ let reservations = state.executor_manager.reserve_slots(4).await?;
+
+ assert_eq!(reservations.len(), 0);
+
+ Ok(())
+ }
+
+ fn test_executors(
+ total_executors: usize,
+ slots_per_executor: u32,
+ ) -> Vec<(ExecutorMetadata, ExecutorData)> {
+ let mut result: Vec<(ExecutorMetadata, ExecutorData)> = vec![];
+
+ for i in 0..total_executors {
+ result.push((
+ ExecutorMetadata {
+ id: format!("executor-{}", i),
+ host: format!("host-{}", i),
+ port: 8080,
+ grpc_port: 9090,
+ specification: ExecutorSpecification {
+ task_slots: slots_per_executor,
+ },
+ },
+ ExecutorData {
+ executor_id: format!("executor-{}", i),
+ total_task_slots: slots_per_executor,
+ available_task_slots: slots_per_executor,
+ },
+ ));
+ }
+
+ result
+ }
+
+ async fn test_graph(ctx: Arc<SessionContext>) -> Arc<dyn ExecutionPlan> {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::Utf8, false),
+ Field::new("gmv", DataType::UInt64, false),
+ ]);
+
+ let plan = scan_empty(None, &schema, Some(vec![0, 1]))
+ .unwrap()
+ .aggregate(vec![col("id")], vec![sum(col("gmv"))])
+ .unwrap()
+ .build()
+ .unwrap();
+
+ ctx.create_physical_plan(&plan).await.unwrap()
+ }
+}
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs
b/ballista/rust/scheduler/src/state/task_manager.rs
index 480f9369..25357f38 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -133,7 +133,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
&self,
executor: &ExecutorMetadata,
task_status: Vec<TaskStatus>,
- ) -> Result<(Vec<QueryStageSchedulerEvent>, Vec<ExecutorReservation>)> {
+ ) -> Result<Vec<QueryStageSchedulerEvent>> {
let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
for status in task_status {
debug!("Task Update\n{:?}", status);
@@ -147,13 +147,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
let mut events: Vec<QueryStageSchedulerEvent> = vec![];
- let mut total_num_tasks = 0;
for (job_id, statuses) in job_updates {
let num_tasks = statuses.len();
debug!("Updating {} tasks in job {}", num_tasks, job_id);
- total_num_tasks += num_tasks;
-
let graph = self.get_active_execution_graph(&job_id).await;
let job_event = if let Some(graph) = graph {
let mut graph = graph.write().await;
@@ -169,11 +166,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
}
}
- let reservation = (0..total_num_tasks)
- .into_iter()
- .map(|_| ExecutorReservation::new_free(executor.id.to_owned()))
- .collect();
- Ok((events, reservation))
+ Ok(events)
}
/// Take a list of executor reservations and fill them with tasks that are
ready
diff --git a/ballista/rust/scheduler/src/test_utils.rs
b/ballista/rust/scheduler/src/test_utils.rs
index c60e8ff3..c115ce72 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -15,16 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use ballista_core::error::{BallistaError, Result};
+use ballista_core::error::Result;
use std::any::Any;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
-use crate::scheduler_server::event::SchedulerServerEvent;
-
use async_trait::async_trait;
-use ballista_core::event_loop::EventAction;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::DataFusionError;
@@ -33,50 +30,11 @@ use datafusion::execution::context::{SessionConfig,
SessionContext, SessionState
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::CsvReadOptions;
-use tokio::sync::mpsc::{Receiver, Sender};
pub const TPCH_TABLES: &[&str] = &[
"part", "supplier", "partsupp", "customer", "orders", "lineitem",
"nation", "region",
];
-/// Test utility that allows observing scheduler events.
-pub struct SchedulerEventObserver {
- sender: Sender<SchedulerServerEvent>,
- errors: Sender<BallistaError>,
-}
-
-impl SchedulerEventObserver {
- pub fn new(
- sender: Sender<SchedulerServerEvent>,
- errors: Sender<BallistaError>,
- ) -> Self {
- Self { sender, errors }
- }
-}
-
-#[async_trait]
-impl EventAction<SchedulerServerEvent> for SchedulerEventObserver {
- fn on_start(&self) {}
-
- fn on_stop(&self) {}
-
- async fn on_receive(
- &self,
- event: SchedulerServerEvent,
- _tx_event: &Sender<SchedulerServerEvent>,
- _rx_event: &Receiver<SchedulerServerEvent>,
- ) -> Result<()> {
- self.sender.send(event).await.unwrap();
-
- Ok(())
- }
-
- fn on_error(&self, error: BallistaError) {
- let errors = self.errors.clone();
- tokio::task::spawn(async move { errors.send(error).await.unwrap() });
- }
-}
-
/// Sometimes we need to construct logical plans that will produce errors
/// when we try and create physical plan. A scan using `ExplodingTableProvider`
/// will do the trick