This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 93cf14f7 Cluster state refactor part 1 (#560)
93cf14f7 is described below
commit 93cf14f707420eabfc6de751cfd7bf10afec164c
Author: Dan Harris <[email protected]>
AuthorDate: Sat Jan 21 08:44:49 2023 -0500
Cluster state refactor part 1 (#560)
* Customize session builder
* Add setter for executor slots policy
* Construct Executor with functions
* Add queued and completed timestamps to successful job status
* Add public methods to SchedulerServer
* Public method for getting execution graph
* Public method for stage metrics
* Use node-level local limit (#20)
* Use node-level local limit
* serialize limit in shuffle writer
* Revert "Merge pull request #19 from coralogix/sc-5792"
This reverts commit 08140ef3f1ebe573ce9f0d2f3253422546c8a31a, reversing
changes made to a7f13844e44328203f803e5073cfff69f3981f4e.
* add log
* make sure we don't forget limit for shuffle writer
* update accum correctly and try to break early
* Check local limit accumulator before polling for more data
* fix build
Co-authored-by: Martins Purins <[email protected]>
* configure_me_codegen retroactively reserved on our `bind_host` parameā¦
(#520)
* configure_me_codegen retroactively reserved on our `bind_host` parameter
name
* Add label and pray
* Add more labels why not
* Add ClusterState trait
* Refactor slightly for clarity
* Revert "Use node-level local limit (#20)"
This reverts commit ff96bcd5b4294e71babe98fb177da84a8423ee82.
* Revert "Public method for stage metrics"
This reverts commit a80231509c478c0c82972b2ff24782804862c04b.
* Revert "Public method for getting execution graph"
This reverts commit 490bda5aae4b79ab1bd7fbd564281929ecb693ca.
* Revert "Add public methods to SchedulerServer"
This reverts commit 5ad27c04e80a2db55a9bb29444f98da6b16c395c.
* Revert "Add queued and completed timestamps to successful job status"
This reverts commit c615fcecb1eb2ded6c14ecdd5511bd7a76ccfb85.
* Revert "Construct Executor with functions"
This reverts commit 24d4830417fff730f1e4998533aa5ff3aec8fefc.
* Always forget the apache header
Co-authored-by: Martins Purins <[email protected]>
Co-authored-by: Brent Gardner <[email protected]>
---
ballista/core/src/serde/generated/ballista.rs | 4 +-
ballista/scheduler/scheduler_config_spec.toml | 7 +
ballista/scheduler/src/bin/main.rs | 50 +-
ballista/scheduler/src/config.rs | 5 +
ballista/scheduler/src/scheduler_process.rs | 3 +
ballista/scheduler/src/scheduler_server/grpc.rs | 13 +-
ballista/scheduler/src/scheduler_server/mod.rs | 44 +-
ballista/scheduler/src/standalone.rs | 6 +-
ballista/scheduler/src/state/backend/cluster.rs | 710 +++++++++++++++++++++++
ballista/scheduler/src/state/backend/mod.rs | 14 +-
ballista/scheduler/src/state/executor_manager.rs | 478 ++++-----------
ballista/scheduler/src/state/mod.rs | 16 +-
ballista/scheduler/src/test_utils.rs | 5 +-
13 files changed, 969 insertions(+), 386 deletions(-)
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index d11c6ea5..8f0bc9bc 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -1985,7 +1985,7 @@ pub mod executor_grpc_client {
pub mod scheduler_grpc_server {
#![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
use tonic::codegen::*;
- /// Generated trait containing gRPC methods that should be implemented for
use with SchedulerGrpcServer.
+ ///Generated trait containing gRPC methods that should be implemented for
use with SchedulerGrpcServer.
#[async_trait]
pub trait SchedulerGrpc: Send + Sync + 'static {
/// Executors must poll the scheduler for heartbeat and to receive
tasks
@@ -2531,7 +2531,7 @@ pub mod scheduler_grpc_server {
pub mod executor_grpc_server {
#![allow(unused_variables, dead_code, missing_docs,
clippy::let_unit_value)]
use tonic::codegen::*;
- /// Generated trait containing gRPC methods that should be implemented for
use with ExecutorGrpcServer.
+ ///Generated trait containing gRPC methods that should be implemented for
use with ExecutorGrpcServer.
#[async_trait]
pub trait ExecutorGrpc: Send + Sync + 'static {
async fn launch_task(
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index e79abb85..0bb609cf 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -36,6 +36,13 @@ type = "ballista_scheduler::state::backend::StateBackend"
doc = "The configuration backend for the scheduler, possible values: etcd,
memory, sled. Default: sled"
default = "ballista_scheduler::state::backend::StateBackend::Sled"
+[[param]]
+abbr = "c"
+name = "cluster_backend"
+type = "ballista_scheduler::state::backend::StateBackend"
+doc = "The configuration backend for the scheduler cluster state, possible
values: etcd, memory, sled. Default: sled"
+default = "ballista_scheduler::state::backend::StateBackend::Sled"
+
[[param]]
abbr = "n"
name = "namespace"
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index 3fd1e060..2ad29bd2 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -45,6 +45,7 @@ mod config {
use ballista_core::config::LogRotationPolicy;
use ballista_scheduler::config::SchedulerConfig;
+use ballista_scheduler::state::backend::cluster::DefaultClusterState;
use config::prelude::*;
use tracing_subscriber::EnvFilter;
@@ -60,6 +61,16 @@ async fn main() -> Result<()> {
std::process::exit(0);
}
+ let config_backend = init_kv_backend(&opt.config_backend, &opt).await?;
+
+ let cluster_state = if opt.cluster_backend == opt.config_backend {
+ Arc::new(DefaultClusterState::new(config_backend.clone()))
+ } else {
+ let cluster_kv_store = init_kv_backend(&opt.cluster_backend,
&opt).await?;
+
+ Arc::new(DefaultClusterState::new(cluster_kv_store))
+ };
+
let special_mod_log_level = opt.log_level_setting;
let namespace = opt.namespace;
let external_host = opt.external_host;
@@ -110,13 +121,31 @@ async fn main() -> Result<()> {
let addr = format!("{}:{}", bind_host, port);
let addr = addr.parse()?;
- let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend
{
+ let config = SchedulerConfig {
+ scheduling_policy: opt.scheduler_policy,
+ event_loop_buffer_size: opt.event_loop_buffer_size,
+ executor_slots_policy: opt.executor_slots_policy,
+ finished_job_data_clean_up_interval_seconds: opt
+ .finished_job_data_clean_up_interval_seconds,
+ finished_job_state_clean_up_interval_seconds: opt
+ .finished_job_state_clean_up_interval_seconds,
+ advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
+ };
+ start_server(scheduler_name, config_backend, cluster_state, addr,
config).await?;
+ Ok(())
+}
+
+async fn init_kv_backend(
+ backend: &StateBackend,
+ opt: &Config,
+) -> Result<Arc<dyn StateBackendClient>> {
+ let cluster_backend: Arc<dyn StateBackendClient> = match backend {
#[cfg(feature = "etcd")]
StateBackend::Etcd => {
- let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
+ let etcd = etcd_client::Client::connect(&[opt.etcd_urls.clone()],
None)
.await
.context("Could not connect to etcd")?;
- Arc::new(EtcdClient::new(namespace.clone(), etcd))
+ Arc::new(EtcdClient::new(opt.namespace.clone(), etcd))
}
#[cfg(not(feature = "etcd"))]
StateBackend::Etcd => {
@@ -134,7 +163,7 @@ async fn main() -> Result<()> {
} else {
println!("{}", opt.sled_dir);
Arc::new(
- SledClient::try_new(opt.sled_dir)
+ SledClient::try_new(opt.sled_dir.clone())
.context("Could not create sled config backend")?,
)
}
@@ -148,16 +177,5 @@ async fn main() -> Result<()> {
StateBackend::Memory => Arc::new(MemoryBackendClient::new()),
};
- let config = SchedulerConfig {
- scheduling_policy: opt.scheduler_policy,
- event_loop_buffer_size: opt.event_loop_buffer_size,
- executor_slots_policy: opt.executor_slots_policy,
- finished_job_data_clean_up_interval_seconds: opt
- .finished_job_data_clean_up_interval_seconds,
- finished_job_state_clean_up_interval_seconds: opt
- .finished_job_state_clean_up_interval_seconds,
- advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
- };
- start_server(scheduler_name, config_backend, addr, config).await?;
- Ok(())
+ Ok(cluster_backend)
}
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 8ef33219..97f79787 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -90,6 +90,11 @@ impl SchedulerConfig {
self.advertise_flight_sql_endpoint = endpoint;
self
}
+
+ pub fn with_executor_slots_policy(mut self, policy: SlotsPolicy) -> Self {
+ self.executor_slots_policy = policy;
+ self
+ }
}
// an enum used to configure the executor slots policy
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 06057871..44cf7bf4 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -41,11 +41,13 @@ use crate::flight_sql::FlightSqlServiceImpl;
use crate::metrics::default_metrics_collector;
use
crate::scheduler_server::externalscaler::external_scaler_server::ExternalScalerServer;
use crate::scheduler_server::SchedulerServer;
+use crate::state::backend::cluster::ClusterState;
use crate::state::backend::StateBackendClient;
pub async fn start_server(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
+ cluster_state: Arc<dyn ClusterState>,
addr: SocketAddr,
config: SchedulerConfig,
) -> Result<()> {
@@ -65,6 +67,7 @@ pub async fn start_server(
SchedulerServer::new(
scheduler_name,
config_backend.clone(),
+ cluster_state,
BallistaCodec::default(),
config,
metrics_collector,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 979b65f7..ad04efb2 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -588,6 +588,7 @@ mod test {
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::default_session_builder;
+ use crate::state::backend::cluster::DefaultClusterState;
use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
use crate::state::{backend::sled::SledClient, SchedulerState};
@@ -596,10 +597,12 @@ mod test {
#[tokio::test]
async fn test_poll_work() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
+ cluster_state.clone(),
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
@@ -627,6 +630,7 @@ mod test {
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
+ cluster_state.clone(),
default_session_builder,
BallistaCodec::default(),
);
@@ -660,6 +664,7 @@ mod test {
let state: SchedulerState<LogicalPlanNode, PhysicalPlanNode> =
SchedulerState::new_with_default_scheduler_name(
state_storage.clone(),
+ cluster_state,
default_session_builder,
BallistaCodec::default(),
);
@@ -683,10 +688,12 @@ mod test {
#[tokio::test]
async fn test_stop_executor() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
- state_storage.clone(),
+ state_storage,
+ cluster_state,
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
@@ -764,10 +771,12 @@ mod test {
#[ignore]
async fn test_expired_executor() -> Result<(), BallistaError> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
- state_storage.clone(),
+ state_storage,
+ cluster_state,
BallistaCodec::default(),
SchedulerConfig::default(),
default_metrics_collector().unwrap(),
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index f4727f40..f39e0d1b 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -35,6 +35,7 @@ use log::{error, warn};
use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
+use crate::state::backend::cluster::ClusterState;
use crate::state::backend::StateBackendClient;
use crate::state::executor_manager::{
ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
@@ -69,12 +70,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
pub fn new(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
+ cluster_state: Arc<dyn ClusterState>,
codec: BallistaCodec<T, U>,
config: SchedulerConfig,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
) -> Self {
let state = Arc::new(SchedulerState::new(
config_backend,
+ cluster_state,
default_session_builder,
codec,
scheduler_name.clone(),
@@ -97,10 +100,45 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
}
}
+ pub fn with_session_builder(
+ scheduler_name: String,
+ config_backend: Arc<dyn StateBackendClient>,
+ cluster_backend: Arc<dyn ClusterState>,
+ codec: BallistaCodec<T, U>,
+ config: SchedulerConfig,
+ session_builder: SessionBuilder,
+ metrics_collector: Arc<dyn SchedulerMetricsCollector>,
+ ) -> Self {
+ let state = Arc::new(SchedulerState::new(
+ config_backend,
+ cluster_backend,
+ session_builder,
+ codec,
+ scheduler_name.clone(),
+ config.clone(),
+ ));
+ let query_stage_scheduler =
+ Arc::new(QueryStageScheduler::new(state.clone(),
metrics_collector));
+ let query_stage_event_loop = EventLoop::new(
+ "query_stage".to_owned(),
+ config.event_loop_buffer_size as usize,
+ query_stage_scheduler.clone(),
+ );
+
+ Self {
+ scheduler_name,
+ start_time: timestamp_millis() as u128,
+ state,
+ query_stage_event_loop,
+ query_stage_scheduler,
+ }
+ }
+
#[allow(dead_code)]
pub(crate) fn with_task_launcher(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
+ cluster_backend: Arc<dyn ClusterState>,
codec: BallistaCodec<T, U>,
config: SchedulerConfig,
metrics_collector: Arc<dyn SchedulerMetricsCollector>,
@@ -108,6 +146,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
) -> Self {
let state = Arc::new(SchedulerState::with_task_launcher(
config_backend,
+ cluster_backend,
default_session_builder,
codec,
scheduler_name.clone(),
@@ -330,6 +369,7 @@ mod test {
use ballista_core::serde::BallistaCodec;
use crate::scheduler_server::{timestamp_millis, SchedulerServer};
+ use crate::state::backend::cluster::DefaultClusterState;
use crate::state::backend::sled::SledClient;
use crate::test_utils::{
@@ -599,10 +639,12 @@ mod test {
scheduling_policy: TaskSchedulingPolicy,
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
- state_storage.clone(),
+ state_storage,
+ cluster_state,
BallistaCodec::default(),
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
Arc::new(TestMetricsCollector::default()),
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index 43f3e505..0fe608f8 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -17,6 +17,7 @@
use crate::config::SchedulerConfig;
use crate::metrics::default_metrics_collector;
+use crate::state::backend::cluster::DefaultClusterState;
use crate::{scheduler_server::SchedulerServer,
state::backend::sled::SledClient};
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;
@@ -31,14 +32,15 @@ use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
- let client = SledClient::try_new_temporary()?;
+ let backend = Arc::new(SledClient::try_new_temporary()?);
let metrics_collector = default_metrics_collector()?;
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
SchedulerServer::new(
"localhost:50050".to_owned(),
- Arc::new(client),
+ backend.clone(),
+ Arc::new(DefaultClusterState::new(backend)),
BallistaCodec::default(),
SchedulerConfig::default(),
metrics_collector,
diff --git a/ballista/scheduler/src/state/backend/cluster.rs
b/ballista/scheduler/src/state/backend/cluster.rs
new file mode 100644
index 00000000..2370c492
--- /dev/null
+++ b/ballista/scheduler/src/state/backend/cluster.rs
@@ -0,0 +1,710 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::state::backend::{
+ Keyspace, Operation, StateBackendClient, TaskDistribution, WatchEvent,
+};
+use crate::state::executor_manager::ExecutorReservation;
+use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
+use ballista_core::error;
+use ballista_core::error::BallistaError;
+use ballista_core::serde::protobuf;
+use ballista_core::serde::protobuf::{
+ executor_status, ExecutorHeartbeat, ExecutorStatus,
+};
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use futures::{Stream, StreamExt};
+use log::{debug, info};
+use std::collections::{HashMap, HashSet};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+pub type ExecutorHeartbeatStream = Pin<Box<dyn Stream<Item =
ExecutorHeartbeat> + Send>>;
+
+/// A trait that contains the necessary method to maintain a globally
consistent view of cluster resources
+#[tonic::async_trait]
+pub trait ClusterState: Send + Sync {
+ /// Reserve up to `num_slots` executor task slots. If not enough task
slots are available, reserve
+ /// as many as possible.
+ ///
+ /// If `executors` is provided, only reserve slots of the specified
executor IDs
+ async fn reserve_slots(
+ &self,
+ num_slots: u32,
+ distribution: TaskDistribution,
+ executors: Option<HashSet<String>>,
+ ) -> error::Result<Vec<ExecutorReservation>>;
+
+ /// Reserve exactly `num_slots` executor task slots. If not enough task
slots are available,
+ /// returns an empty vec
+ ///
+ /// If `executors` is provided, only reserve slots of the specified
executor IDs
+ async fn reserve_slots_exact(
+ &self,
+ num_slots: u32,
+ distribution: TaskDistribution,
+ executors: Option<HashSet<String>>,
+ ) -> error::Result<Vec<ExecutorReservation>>;
+
+ /// Cancel the specified reservations. This will make reserved executor
slots available to other
+ /// tasks.
+ /// This operations should be atomic. Either all reservations are
cancelled or none are
+ async fn cancel_reservations(
+ &self,
+ reservations: Vec<ExecutorReservation>,
+ ) -> error::Result<()>;
+
+ /// Register a new executor in the cluster. If `reserve` is true, then the
executors task slots
+ /// will be reserved and returned in the response and none of the new
executors task slots will be
+ /// available to other tasks.
+ async fn register_executor(
+ &self,
+ metadata: ExecutorMetadata,
+ spec: ExecutorData,
+ reserve: bool,
+ ) -> error::Result<Vec<ExecutorReservation>>;
+
+ /// Save the executor metadata. This will overwrite existing metadata for
the executor ID
+ async fn save_executor_metadata(
+ &self,
+ metadata: ExecutorMetadata,
+ ) -> error::Result<()>;
+
+ /// Get executor metadata for the provided executor ID. Returns an error
if the executor does not exist
+ async fn get_executor_metadata(
+ &self,
+ executor_id: &str,
+ ) -> error::Result<ExecutorMetadata>;
+
+ /// Save the executor heartbeat
+ async fn save_executor_heartbeat(
+ &self,
+ heartbeat: ExecutorHeartbeat,
+ ) -> error::Result<()>;
+
+ /// Remove the executor from the cluster
+ async fn remove_executor(&self, executor_id: &str) -> error::Result<()>;
+
+ /// Return the stream of executor heartbeats observed by all schedulers in
the cluster.
+ /// This can be aggregated to provide an eventually consistent view of all
executors within the cluster
+ async fn executor_heartbeat_stream(&self) ->
error::Result<ExecutorHeartbeatStream>;
+
+ /// Return a map of the last seen heartbeat for all active executors
+ async fn executor_heartbeats(
+ &self,
+ ) -> error::Result<HashMap<String, ExecutorHeartbeat>>;
+}
+
+/// Default implementation of `ClusterState` that can use the key-value
interface defined in
+/// `StateBackendClient
+pub struct DefaultClusterState {
+ kv_store: Arc<dyn StateBackendClient>,
+}
+
+impl DefaultClusterState {
+ pub fn new(kv_store: Arc<dyn StateBackendClient>) -> Self {
+ Self { kv_store }
+ }
+}
+
+#[tonic::async_trait]
+impl ClusterState for DefaultClusterState {
+ async fn reserve_slots(
+ &self,
+ num_slots: u32,
+ distribution: TaskDistribution,
+ executors: Option<HashSet<String>>,
+ ) -> error::Result<Vec<ExecutorReservation>> {
+ let lock = self.kv_store.lock(Keyspace::Slots, "global").await?;
+
+ with_lock(lock, async {
+ debug!("Attempting to reserve {} executor slots", num_slots);
+ let start = Instant::now();
+
+ let executors = match executors {
+ Some(executors) => executors,
+ None => {
+ let heartbeats = self.executor_heartbeats().await?;
+
+ get_alive_executors(60, heartbeats)?
+ }
+ };
+
+ let (reservations, txn_ops) = match distribution {
+ TaskDistribution::Bias => {
+ reserve_slots_bias(self.kv_store.as_ref(), num_slots,
executors)
+ .await?
+ }
+ TaskDistribution::RoundRobin => {
+ reserve_slots_round_robin(
+ self.kv_store.as_ref(),
+ num_slots,
+ executors,
+ )
+ .await?
+ }
+ };
+
+ self.kv_store.apply_txn(txn_ops).await?;
+
+ let elapsed = start.elapsed();
+ info!(
+ "Reserved {} executor slots in {:?}",
+ reservations.len(),
+ elapsed
+ );
+
+ Ok(reservations)
+ })
+ .await
+ }
+
+ async fn reserve_slots_exact(
+ &self,
+ num_slots: u32,
+ distribution: TaskDistribution,
+ executors: Option<HashSet<String>>,
+ ) -> error::Result<Vec<ExecutorReservation>> {
+ let lock = self.kv_store.lock(Keyspace::Slots, "global").await?;
+
+ with_lock(lock, async {
+ debug!("Attempting to reserve {} executor slots", num_slots);
+ let start = Instant::now();
+
+ let executors = match executors {
+ Some(executors) => executors,
+ None => {
+ let heartbeats = self.executor_heartbeats().await?;
+
+ get_alive_executors(60, heartbeats)?
+ }
+ };
+
+ let (reservations, txn_ops) = match distribution {
+ TaskDistribution::Bias => {
+ reserve_slots_bias(self.kv_store.as_ref(), num_slots,
executors)
+ .await?
+ }
+ TaskDistribution::RoundRobin => {
+ reserve_slots_round_robin(
+ self.kv_store.as_ref(),
+ num_slots,
+ executors,
+ )
+ .await?
+ }
+ };
+
+ let elapsed = start.elapsed();
+ if reservations.len() as u32 == num_slots {
+ self.kv_store.apply_txn(txn_ops).await?;
+
+ info!(
+ "Reserved {} executor slots in {:?}",
+ reservations.len(),
+ elapsed
+ );
+
+ Ok(reservations)
+ } else {
+ info!(
+ "Failed to reserve exactly {} executor slots in {:?}",
+ reservations.len(),
+ elapsed
+ );
+
+ Ok(vec![])
+ }
+ })
+ .await
+ }
+
+ async fn cancel_reservations(
+ &self,
+ reservations: Vec<ExecutorReservation>,
+ ) -> error::Result<()> {
+ let lock = self.kv_store.lock(Keyspace::Slots, "global").await?;
+
+ with_lock(lock, async {
+ let num_reservations = reservations.len();
+ debug!("Cancelling {} reservations", num_reservations);
+ let start = Instant::now();
+
+ let mut executor_slots: HashMap<String, ExecutorData> =
HashMap::new();
+
+ for reservation in reservations {
+ let executor_id = &reservation.executor_id;
+ if let Some(data) = executor_slots.get_mut(executor_id) {
+ data.available_task_slots += 1;
+ } else {
+ let value = self.kv_store.get(Keyspace::Slots,
executor_id).await?;
+ let mut data =
+ decode_into::<protobuf::ExecutorData,
ExecutorData>(&value)?;
+ data.available_task_slots += 1;
+ executor_slots.insert(executor_id.clone(), data);
+ }
+ }
+
+ let txn_ops: Vec<(Operation, Keyspace, String)> = executor_slots
+ .into_iter()
+ .map(|(executor_id, data)| {
+ let proto: protobuf::ExecutorData = data.into();
+ let new_data = encode_protobuf(&proto)?;
+ Ok((Operation::Put(new_data), Keyspace::Slots,
executor_id))
+ })
+ .collect::<error::Result<Vec<_>>>()?;
+
+ self.kv_store.apply_txn(txn_ops).await?;
+
+ let elapsed = start.elapsed();
+ info!(
+ "Cancelled {} reservations in {:?}",
+ num_reservations, elapsed
+ );
+
+ Ok(())
+ })
+ .await
+ }
+
+ async fn register_executor(
+ &self,
+ metadata: ExecutorMetadata,
+ spec: ExecutorData,
+ reserve: bool,
+ ) -> error::Result<Vec<ExecutorReservation>> {
+ let executor_id = metadata.id.clone();
+
+ let current_ts = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .map_err(|e| {
+ BallistaError::Internal(format!(
+ "Error getting current timestamp: {:?}",
+ e
+ ))
+ })?
+ .as_secs();
+
+ //TODO this should be in a transaction
+ // Now that we know we can connect, save the metadata and slots
+ self.save_executor_metadata(metadata).await?;
+ self.save_executor_heartbeat(ExecutorHeartbeat {
+ executor_id: executor_id.clone(),
+ timestamp: current_ts,
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status: Some(executor_status::Status::Active("".to_string())),
+ }),
+ })
+ .await?;
+
+ if !reserve {
+ let proto: protobuf::ExecutorData = spec.into();
+ let value = encode_protobuf(&proto)?;
+ self.kv_store
+ .put(Keyspace::Slots, executor_id, value)
+ .await?;
+ Ok(vec![])
+ } else {
+ let mut specification = spec;
+ let num_slots = specification.available_task_slots as usize;
+ let mut reservations: Vec<ExecutorReservation> = vec![];
+ for _ in 0..num_slots {
+
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+ }
+
+ specification.available_task_slots = 0;
+
+ let proto: protobuf::ExecutorData = specification.into();
+ let value = encode_protobuf(&proto)?;
+ self.kv_store
+ .put(Keyspace::Slots, executor_id, value)
+ .await?;
+ Ok(reservations)
+ }
+ }
+
+ async fn save_executor_metadata(
+ &self,
+ metadata: ExecutorMetadata,
+ ) -> error::Result<()> {
+ let executor_id = metadata.id.clone();
+ let proto: protobuf::ExecutorMetadata = metadata.into();
+ let value = encode_protobuf(&proto)?;
+
+ self.kv_store
+ .put(Keyspace::Executors, executor_id, value)
+ .await
+ }
+
+ async fn get_executor_metadata(
+ &self,
+ executor_id: &str,
+ ) -> error::Result<ExecutorMetadata> {
+ let value = self.kv_store.get(Keyspace::Executors, executor_id).await?;
+
+ let decoded =
+ decode_into::<protobuf::ExecutorMetadata,
ExecutorMetadata>(&value)?;
+ Ok(decoded)
+ }
+
+ async fn save_executor_heartbeat(
+ &self,
+ heartbeat: ExecutorHeartbeat,
+ ) -> error::Result<()> {
+ let executor_id = heartbeat.executor_id.clone();
+ let value = encode_protobuf(&heartbeat)?;
+ self.kv_store
+ .put(Keyspace::Heartbeats, executor_id, value)
+ .await
+ }
+
+ async fn remove_executor(&self, executor_id: &str) -> error::Result<()> {
+ let current_ts = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .map_err(|e| {
+ BallistaError::Internal(format!(
+ "Error getting current timestamp: {:?}",
+ e
+ ))
+ })?
+ .as_secs();
+
+ let value = encode_protobuf(&ExecutorHeartbeat {
+ executor_id: executor_id.to_owned(),
+ timestamp: current_ts,
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status: Some(executor_status::Status::Dead("".to_string())),
+ }),
+ })?;
+ self.kv_store
+ .put(Keyspace::Heartbeats, executor_id.to_owned(), value)
+ .await?;
+
+ // TODO Check the Executor reservation logic for push-based scheduling
+
+ Ok(())
+ }
+
+ async fn executor_heartbeat_stream(&self) ->
error::Result<ExecutorHeartbeatStream> {
+ let events = self
+ .kv_store
+ .watch(Keyspace::Heartbeats, String::default())
+ .await?;
+
+ Ok(events
+ .filter_map(|event| {
+ futures::future::ready(match event {
+ WatchEvent::Put(_, value) => {
+ if let Ok(heartbeat) =
+ decode_protobuf::<ExecutorHeartbeat>(&value)
+ {
+ Some(heartbeat)
+ } else {
+ None
+ }
+ }
+ WatchEvent::Delete(_) => None,
+ })
+ })
+ .boxed())
+ }
+
+ async fn executor_heartbeats(
+ &self,
+ ) -> error::Result<HashMap<String, ExecutorHeartbeat>> {
+ let heartbeats = self.kv_store.scan(Keyspace::Heartbeats, None).await?;
+
+ let mut heartbeat_map = HashMap::with_capacity(heartbeats.len());
+
+ for (_, value) in heartbeats {
+ let data: ExecutorHeartbeat = decode_protobuf(&value)?;
+ if let Some(ExecutorStatus {
+ status: Some(executor_status::Status::Active(_)),
+ }) = &data.status
+ {
+ heartbeat_map.insert(data.executor_id.clone(), data);
+ }
+ }
+
+ Ok(heartbeat_map)
+ }
+}
+
+/// Return the set of executor IDs which have heartbeated within
`last_seen_threshold` seconds
+fn get_alive_executors(
+ last_seen_threshold: u64,
+ heartbeats: HashMap<String, ExecutorHeartbeat>,
+) -> error::Result<HashSet<String>> {
+ let now_epoch_ts = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
+
+ let last_seen_threshold = now_epoch_ts
+ .checked_sub(Duration::from_secs(last_seen_threshold))
+ .ok_or_else(|| {
+ BallistaError::Internal(format!(
+ "Error getting alive executors, invalid last_seen_threshold of
{}",
+ last_seen_threshold
+ ))
+ })?
+ .as_secs();
+
+ Ok(heartbeats
+ .iter()
+ .filter_map(|(exec, heartbeat)| {
+ (heartbeat.timestamp > last_seen_threshold).then(|| exec.clone())
+ })
+ .collect())
+}
+
+/// It will get ExecutorReservation from one executor as many as possible.
+/// By this way, it can reduce the chance of decoding and encoding
ExecutorData.
+/// However, it may make the whole cluster unbalanced,
+/// which means some executors may be very busy while other executors may be
idle.
+async fn reserve_slots_bias(
+ state: &dyn StateBackendClient,
+ mut n: u32,
+ executors: HashSet<String>,
+) -> error::Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace,
String)>)> {
+ let mut reservations: Vec<ExecutorReservation> = vec![];
+ let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+ for executor_id in executors {
+ if n == 0 {
+ break;
+ }
+
+ let value = state.get(Keyspace::Slots, &executor_id).await?;
+ let mut data = decode_into::<protobuf::ExecutorData,
ExecutorData>(&value)?;
+ let take = std::cmp::min(data.available_task_slots, n);
+
+ for _ in 0..take {
+
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+ data.available_task_slots -= 1;
+ n -= 1;
+ }
+
+ let proto: protobuf::ExecutorData = data.into();
+ let new_data = encode_protobuf(&proto)?;
+ txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+ }
+
+ Ok((reservations, txn_ops))
+}
+
+/// Create ExecutorReservation in a round robin way to evenly assign tasks to
executors
+async fn reserve_slots_round_robin(
+ state: &dyn StateBackendClient,
+ mut n: u32,
+ executors: HashSet<String>,
+) -> error::Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace,
String)>)> {
+ let mut reservations: Vec<ExecutorReservation> = vec![];
+ let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
+
+ let all_executor_data = state
+ .scan(Keyspace::Slots, None)
+ .await?
+ .into_iter()
+ .map(|(_, data)| decode_into::<protobuf::ExecutorData,
ExecutorData>(&data))
+ .collect::<error::Result<Vec<ExecutorData>>>()?;
+
+ let mut available_executor_data: Vec<ExecutorData> = all_executor_data
+ .into_iter()
+ .filter_map(|data| {
+ (data.available_task_slots > 0 &&
executors.contains(&data.executor_id))
+ .then_some(data)
+ })
+ .collect();
+ available_executor_data
+ .sort_by(|a, b| Ord::cmp(&b.available_task_slots,
&a.available_task_slots));
+
+ // Exclusive
+ let mut last_updated_idx = 0usize;
+ loop {
+ let n_before = n;
+ for (idx, data) in available_executor_data.iter_mut().enumerate() {
+ if n == 0 {
+ break;
+ }
+
+ // Since the vector is sorted in descending order,
+ // if finding one executor has not enough slots, the following
will have not enough, either
+ if data.available_task_slots == 0 {
+ break;
+ }
+
+
reservations.push(ExecutorReservation::new_free(data.executor_id.clone()));
+ data.available_task_slots -= 1;
+ n -= 1;
+
+ if idx >= last_updated_idx {
+ last_updated_idx = idx + 1;
+ }
+ }
+
+ if n_before == n {
+ break;
+ }
+ }
+
+ for (idx, data) in available_executor_data.into_iter().enumerate() {
+ if idx >= last_updated_idx {
+ break;
+ }
+ let executor_id = data.executor_id.clone();
+ let proto: protobuf::ExecutorData = data.into();
+ let new_data = encode_protobuf(&proto)?;
+ txn_ops.push((Operation::Put(new_data), Keyspace::Slots, executor_id));
+ }
+
+ Ok((reservations, txn_ops))
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::state::backend::cluster::{ClusterState, DefaultClusterState};
+ use crate::state::backend::sled::SledClient;
+
+ use ballista_core::error::Result;
+ use ballista_core::serde::protobuf::{
+ executor_status, ExecutorHeartbeat, ExecutorStatus,
+ };
+
+ use futures::StreamExt;
+
+ use std::sync::Arc;
+
+ #[tokio::test]
+ async fn test_heartbeat_stream() -> Result<()> {
+ let sled = Arc::new(SledClient::try_new_temporary()?);
+
+ let cluster_state: Arc<dyn ClusterState> =
+ Arc::new(DefaultClusterState::new(sled));
+
+ for i in 0..10 {
+ let mut heartbeat_stream =
cluster_state.executor_heartbeat_stream().await?;
+
+ cluster_state
+ .save_executor_heartbeat(ExecutorHeartbeat {
+ executor_id: i.to_string(),
+ timestamp: 0,
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status:
Some(executor_status::Status::Active(String::default())),
+ }),
+ })
+ .await?;
+
+ let received = if let Some(event) = heartbeat_stream.next().await {
+ event.executor_id == i.to_string()
+ } else {
+ false
+ };
+
+ assert!(received, "Did not receive heartbeat for executor {}", i);
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_heartbeats() -> Result<()> {
+ let sled = Arc::new(SledClient::try_new_temporary()?);
+
+ let cluster_state: Arc<dyn ClusterState> =
+ Arc::new(DefaultClusterState::new(sled));
+
+ // Add 10 executor heartbeats
+ for i in 0..10 {
+ cluster_state
+ .save_executor_heartbeat(ExecutorHeartbeat {
+ executor_id: i.to_string(),
+ timestamp: i as u64,
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status:
Some(executor_status::Status::Active(String::default())),
+ }),
+ })
+ .await?;
+ }
+
+ let heartbeats = cluster_state.executor_heartbeats().await?;
+
+ // Check that all 10 are present in the global view
+ for i in 0..10 {
+ let id = i.to_string();
+ if let Some(hb) = heartbeats.get(&id) {
+ assert_eq!(
+ hb.executor_id,
+ i.to_string(),
+ "Expected heartbeat in map for {}",
+ i
+ );
+ assert_eq!(
+ hb.timestamp, i,
+ "Expected timestamp to be correct for {}",
+ i
+ );
+ } else {
+ panic!("Expected heartbeat for executor {}", i);
+ }
+ }
+
+ // Send new heartbeat with updated timestamp
+ cluster_state
+ .save_executor_heartbeat(ExecutorHeartbeat {
+ executor_id: "0".to_string(),
+ timestamp: 100,
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status:
Some(executor_status::Status::Active(String::default())),
+ }),
+ })
+ .await?;
+
+ let heartbeats = cluster_state.executor_heartbeats().await?;
+
+ if let Some(hb) = heartbeats.get("0") {
+ assert_eq!(hb.executor_id, "0", "Expected heartbeat in map for 0");
+ assert_eq!(hb.timestamp, 100, "Expected timestamp to be updated
for 0");
+ }
+
+ for i in 1..10 {
+ let id = i.to_string();
+ if let Some(hb) = heartbeats.get(&id) {
+ assert_eq!(
+ hb.executor_id,
+ i.to_string(),
+ "Expected heartbeat in map for {}",
+ i
+ );
+ assert_eq!(
+ hb.timestamp, i,
+ "Expected timestamp to be correct for {}",
+ i
+ );
+ } else {
+ panic!("Expected heartbeat for executor {}", i);
+ }
+ }
+
+ Ok(())
+ }
+}
diff --git a/ballista/scheduler/src/state/backend/mod.rs
b/ballista/scheduler/src/state/backend/mod.rs
index 0807f3c2..5c859937 100644
--- a/ballista/scheduler/src/state/backend/mod.rs
+++ b/ballista/scheduler/src/state/backend/mod.rs
@@ -22,6 +22,7 @@ use std::collections::HashSet;
use std::fmt;
use tokio::sync::OwnedMutexGuard;
+pub mod cluster;
#[cfg(feature = "etcd")]
pub mod etcd;
pub mod memory;
@@ -31,7 +32,7 @@ mod utils;
// an enum used to configure the backend
// needs to be visible to code generated by configure_me
-#[derive(Debug, Clone, ArgEnum, serde::Deserialize)]
+#[derive(Debug, Clone, ArgEnum, serde::Deserialize, PartialEq, Eq)]
pub enum StateBackend {
Etcd,
Memory,
@@ -131,6 +132,17 @@ pub trait StateBackendClient: Send + Sync {
async fn delete(&self, keyspace: Keyspace, key: &str) -> Result<()>;
}
+/// Method of distributing tasks to available executor slots
+#[derive(Debug, Clone, Copy)]
+pub enum TaskDistribution {
+ /// Eagerly assign tasks to executor slots. This will assign as many task
slots per executor
+ /// as are currently available
+ Bias,
+ /// Distributed tasks evenely across executors. This will try and iterate
through available executors
+ /// and assign one task to each executor until all tasks are assigned.
+ RoundRobin,
+}
+
/// A Watch is a cancelable stream of put or delete events in the
[StateBackendClient]
#[tonic::async_trait]
pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 165502b2..a6234528 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -15,15 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use crate::state::backend::{Keyspace, Operation, StateBackendClient,
WatchEvent};
+use crate::state::backend::TaskDistribution;
-use crate::state::{decode_into, decode_protobuf, encode_protobuf, with_lock};
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf;
use crate::config::SlotsPolicy;
+use crate::state::backend::cluster::ClusterState;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
use ballista_core::serde::protobuf::{
@@ -88,7 +88,8 @@ pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
pub(crate) struct ExecutorManager {
// executor slot policy
slots_policy: SlotsPolicy,
- state: Arc<dyn StateBackendClient>,
+ task_distribution: TaskDistribution,
+ cluster_state: Arc<dyn ClusterState>,
// executor_id -> ExecutorMetadata map
executor_metadata: Arc<DashMap<String, ExecutorMetadata>>,
// executor_id -> ExecutorHeartbeat map
@@ -102,12 +103,20 @@ pub(crate) struct ExecutorManager {
impl ExecutorManager {
pub(crate) fn new(
- state: Arc<dyn StateBackendClient>,
+ cluster_state: Arc<dyn ClusterState>,
slots_policy: SlotsPolicy,
) -> Self {
+ let task_distribution = match slots_policy {
+ SlotsPolicy::Bias => TaskDistribution::Bias,
+ SlotsPolicy::RoundRobin | SlotsPolicy::RoundRobinLocal => {
+ TaskDistribution::RoundRobin
+ }
+ };
+
Self {
slots_policy,
- state,
+ task_distribution,
+ cluster_state,
executor_metadata: Arc::new(DashMap::new()),
executors_heartbeat: Arc::new(DashMap::new()),
executor_data: Arc::new(Mutex::new(HashMap::new())),
@@ -116,16 +125,33 @@ impl ExecutorManager {
}
}
- /// Initialize the `ExecutorManager` state. This will fill the
`executor_heartbeats` value
- /// with existing active heartbeats. Then new updates will be consumed
through the `ExecutorHeartbeatListener`
+ /// Initialize a background process that will listen for executor
heartbeats and update the in-memory cache
+ /// of executor heartbeats
pub async fn init(&self) -> Result<()> {
self.init_active_executor_heartbeats().await?;
- let heartbeat_listener = ExecutorHeartbeatListener::new(
- self.state.clone(),
- self.executors_heartbeat.clone(),
- self.dead_executors.clone(),
- );
- heartbeat_listener.start().await
+
+ let mut heartbeat_stream =
self.cluster_state.executor_heartbeat_stream().await?;
+
+ info!("Initializing heartbeat listener");
+
+ let heartbeats = self.executors_heartbeat.clone();
+ let dead_executors = self.dead_executors.clone();
+ tokio::task::spawn(async move {
+ while let Some(heartbeat) = heartbeat_stream.next().await {
+ let executor_id = heartbeat.executor_id.clone();
+ if let Some(ExecutorStatus {
+ status: Some(executor_status::Status::Dead(_)),
+ }) = heartbeat.status
+ {
+ heartbeats.remove(&executor_id);
+ dead_executors.insert(executor_id);
+ } else {
+ heartbeats.insert(executor_id, heartbeat);
+ }
+ }
+ });
+
+ Ok(())
}
/// Reserve up to n executor task slots. Once reserved these slots will
not be available
@@ -135,7 +161,13 @@ impl ExecutorManager {
if self.slots_policy.is_local() {
self.reserve_slots_local(n).await
} else {
- self.reserve_slots_global(n).await
+ let alive_executors = self.get_alive_executors_within_one_minute();
+
+ println!("Alive executors: {:?}", alive_executors);
+
+ self.cluster_state
+ .reserve_slots(n, self.task_distribution,
Some(alive_executors))
+ .await
}
}
@@ -210,152 +242,6 @@ impl ExecutorManager {
Ok(reservations)
}
- /// Reserve up to n executor task slots with considering the global
resource snapshot
- async fn reserve_slots_global(&self, n: u32) ->
Result<Vec<ExecutorReservation>> {
- let lock = self.state.lock(Keyspace::Slots, "global").await?;
-
- with_lock(lock, async {
- debug!("Attempting to reserve {} executor slots", n);
- let start = Instant::now();
-
- let alive_executors = self.get_alive_executors_within_one_minute();
-
- let (reservations, txn_ops) = match self.slots_policy {
- SlotsPolicy::Bias => {
- self.reserve_slots_global_bias(n, alive_executors).await?
- }
- SlotsPolicy::RoundRobin => {
- self.reserve_slots_global_round_robin(n, alive_executors)
- .await?
- }
- _ => {
- return Err(BallistaError::General(format!(
- "Reservation policy {:?} is not supported",
- self.slots_policy
- )))
- }
- };
-
- self.state.apply_txn(txn_ops).await?;
-
- let elapsed = start.elapsed();
- info!(
- "Reserved {} executor slots in {:?}",
- reservations.len(),
- elapsed
- );
-
- Ok(reservations)
- })
- .await
- }
-
- /// It will get ExecutorReservation from one executor as many as possible.
- /// By this way, it can reduce the chance of decoding and encoding
ExecutorData.
- /// However, it may make the whole cluster unbalanced,
- /// which means some executors may be very busy while other executors may
be idle.
- async fn reserve_slots_global_bias(
- &self,
- mut n: u32,
- alive_executors: HashSet<String>,
- ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace,
String)>)> {
- let mut reservations: Vec<ExecutorReservation> = vec![];
- let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
-
- for executor_id in alive_executors {
- if n == 0 {
- break;
- }
-
- let value = self.state.get(Keyspace::Slots, &executor_id).await?;
- let mut data = decode_into::<protobuf::ExecutorData,
ExecutorData>(&value)?;
- let take = std::cmp::min(data.available_task_slots, n);
-
- for _ in 0..take {
-
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
- data.available_task_slots -= 1;
- n -= 1;
- }
-
- let proto: protobuf::ExecutorData = data.into();
- let new_data = encode_protobuf(&proto)?;
- txn_ops.push((Operation::Put(new_data), Keyspace::Slots,
executor_id));
- }
-
- Ok((reservations, txn_ops))
- }
-
- /// Create ExecutorReservation in a round robin way to evenly assign tasks
to executors
- async fn reserve_slots_global_round_robin(
- &self,
- mut n: u32,
- alive_executors: HashSet<String>,
- ) -> Result<(Vec<ExecutorReservation>, Vec<(Operation, Keyspace,
String)>)> {
- let mut reservations: Vec<ExecutorReservation> = vec![];
- let mut txn_ops: Vec<(Operation, Keyspace, String)> = vec![];
-
- let all_executor_data = self
- .state
- .scan(Keyspace::Slots, None)
- .await?
- .into_iter()
- .map(|(_, data)| decode_into::<protobuf::ExecutorData,
ExecutorData>(&data))
- .collect::<Result<Vec<ExecutorData>>>()?;
-
- let mut available_executor_data: Vec<ExecutorData> = all_executor_data
- .into_iter()
- .filter_map(|data| {
- (data.available_task_slots > 0
- && alive_executors.contains(&data.executor_id))
- .then_some(data)
- })
- .collect();
- available_executor_data
- .sort_by(|a, b| Ord::cmp(&b.available_task_slots,
&a.available_task_slots));
-
- // Exclusive
- let mut last_updated_idx = 0usize;
- loop {
- let n_before = n;
- for (idx, data) in available_executor_data.iter_mut().enumerate() {
- if n == 0 {
- break;
- }
-
- // Since the vector is sorted in descending order,
- // if finding one executor has not enough slots, the following
will have not enough, either
- if data.available_task_slots == 0 {
- break;
- }
-
- reservations
-
.push(ExecutorReservation::new_free(data.executor_id.clone()));
- data.available_task_slots -= 1;
- n -= 1;
-
- if idx >= last_updated_idx {
- last_updated_idx = idx + 1;
- }
- }
-
- if n_before == n {
- break;
- }
- }
-
- for (idx, data) in available_executor_data.into_iter().enumerate() {
- if idx >= last_updated_idx {
- break;
- }
- let executor_id = data.executor_id.clone();
- let proto: protobuf::ExecutorData = data.into();
- let new_data = encode_protobuf(&proto)?;
- txn_ops.push((Operation::Put(new_data), Keyspace::Slots,
executor_id));
- }
-
- Ok((reservations, txn_ops))
- }
-
/// Returned reserved task slots to the pool of available slots. This
operation is atomic
/// so either the entire pool of reserved task slots it returned or none
are.
pub async fn cancel_reservations(
@@ -365,7 +251,7 @@ impl ExecutorManager {
if self.slots_policy.is_local() {
self.cancel_reservations_local(reservations).await
} else {
- self.cancel_reservations_global(reservations).await
+ self.cluster_state.cancel_reservations(reservations).await
}
}
@@ -394,54 +280,6 @@ impl ExecutorManager {
Ok(())
}
- async fn cancel_reservations_global(
- &self,
- reservations: Vec<ExecutorReservation>,
- ) -> Result<()> {
- let lock = self.state.lock(Keyspace::Slots, "global").await?;
-
- with_lock(lock, async {
- let num_reservations = reservations.len();
- debug!("Cancelling {} reservations", num_reservations);
- let start = Instant::now();
-
- let mut executor_slots: HashMap<String, ExecutorData> =
HashMap::new();
-
- for reservation in reservations {
- let executor_id = &reservation.executor_id;
- if let Some(data) = executor_slots.get_mut(executor_id) {
- data.available_task_slots += 1;
- } else {
- let value = self.state.get(Keyspace::Slots,
executor_id).await?;
- let mut data =
- decode_into::<protobuf::ExecutorData,
ExecutorData>(&value)?;
- data.available_task_slots += 1;
- executor_slots.insert(executor_id.clone(), data);
- }
- }
-
- let txn_ops: Vec<(Operation, Keyspace, String)> = executor_slots
- .into_iter()
- .map(|(executor_id, data)| {
- let proto: protobuf::ExecutorData = data.into();
- let new_data = encode_protobuf(&proto)?;
- Ok((Operation::Put(new_data), Keyspace::Slots,
executor_id))
- })
- .collect::<Result<Vec<_>>>()?;
-
- self.state.apply_txn(txn_ops).await?;
-
- let elapsed = start.elapsed();
- info!(
- "Cancelled {} reservations in {:?}",
- num_reservations, elapsed
- );
-
- Ok(())
- })
- .await
- }
-
/// Send rpc to Executors to cancel the running tasks
pub async fn cancel_running_tasks(&self, tasks: Vec<RunningTaskInfo>) ->
Result<()> {
let mut tasks_to_cancel: HashMap<&str, Vec<protobuf::RunningTaskInfo>>
=
@@ -595,21 +433,11 @@ impl ExecutorManager {
}
}
- let value = self.state.get(Keyspace::Executors, executor_id).await?;
-
- let decoded =
- decode_into::<protobuf::ExecutorMetadata,
ExecutorMetadata>(&value)?;
- Ok(decoded)
+ self.cluster_state.get_executor_metadata(executor_id).await
}
pub async fn save_executor_metadata(&self, metadata: ExecutorMetadata) ->
Result<()> {
- let executor_id = metadata.id.clone();
- let proto: protobuf::ExecutorMetadata = metadata.into();
- let value = encode_protobuf(&proto)?;
-
- self.state
- .put(Keyspace::Executors, executor_id, value)
- .await
+ self.cluster_state.save_executor_metadata(metadata).await
}
/// Register the executor with the scheduler. This will save the executor
metadata and the
@@ -628,8 +456,6 @@ impl ExecutorManager {
) -> Result<Vec<ExecutorReservation>> {
self.test_scheduler_connectivity(&metadata).await?;
- let executor_id = metadata.id.clone();
-
let current_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
@@ -640,18 +466,14 @@ impl ExecutorManager {
})?
.as_secs();
- //TODO this should be in a transaction
- // Now that we know we can connect, save the metadata and slots
- self.save_executor_metadata(metadata).await?;
- self.save_executor_heartbeat(protobuf::ExecutorHeartbeat {
- executor_id: executor_id.clone(),
+ let initial_heartbeat = ExecutorHeartbeat {
+ executor_id: metadata.id.clone(),
timestamp: current_ts,
metrics: vec![],
status: Some(ExecutorStatus {
- status: Some(executor_status::Status::Active("".to_string())),
+ status:
Some(executor_status::Status::Active(String::default())),
}),
- })
- .await?;
+ };
if !reserve {
if self.slots_policy.is_local() {
@@ -660,16 +482,20 @@ impl ExecutorManager {
.insert(specification.executor_id.clone(),
specification.clone());
}
- let proto: protobuf::ExecutorData = specification.into();
- let value = encode_protobuf(&proto)?;
- self.state.put(Keyspace::Slots, executor_id, value).await?;
+ self.cluster_state
+ .register_executor(metadata, specification.clone(), reserve)
+ .await?;
+
+ self.executors_heartbeat
+ .insert(initial_heartbeat.executor_id.clone(),
initial_heartbeat);
+
Ok(vec![])
} else {
let mut specification = specification;
let num_slots = specification.available_task_slots as usize;
let mut reservations: Vec<ExecutorReservation> = vec![];
for _ in 0..num_slots {
-
reservations.push(ExecutorReservation::new_free(executor_id.clone()));
+
reservations.push(ExecutorReservation::new_free(metadata.id.clone()));
}
specification.available_task_slots = 0;
@@ -680,9 +506,13 @@ impl ExecutorManager {
.insert(specification.executor_id.clone(),
specification.clone());
}
- let proto: protobuf::ExecutorData = specification.into();
- let value = encode_protobuf(&proto)?;
- self.state.put(Keyspace::Slots, executor_id, value).await?;
+ self.cluster_state
+ .register_executor(metadata, specification, reserve)
+ .await?;
+
+ self.executors_heartbeat
+ .insert(initial_heartbeat.executor_id.clone(),
initial_heartbeat);
+
Ok(reservations)
}
}
@@ -691,29 +521,22 @@ impl ExecutorManager {
pub async fn remove_executor(
&self,
executor_id: &str,
- _reason: Option<String>,
+ reason: Option<String>,
) -> Result<()> {
- let current_ts = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .map_err(|e| {
- BallistaError::Internal(format!(
- "Error getting current timestamp: {:?}",
- e
- ))
- })?
- .as_secs();
+ info!("Removing executor {}: {:?}", executor_id, reason);
+ self.cluster_state.remove_executor(executor_id).await?;
- self.save_dead_executor_heartbeat(protobuf::ExecutorHeartbeat {
- executor_id: executor_id.to_owned(),
- timestamp: current_ts,
- metrics: vec![],
- status: Some(ExecutorStatus {
- status: Some(executor_status::Status::Dead("".to_string())),
- }),
- })
- .await?;
+ let executor_id = executor_id.to_owned();
- // TODO Check the Executor reservation logic for push-based scheduling
+ self.executors_heartbeat.remove(&executor_id);
+
+ // Remove executor data cache for dead executors
+ {
+ let mut executor_data = self.executor_data.lock();
+ executor_data.remove(&executor_id);
+ }
+
+ self.dead_executors.insert(executor_id);
Ok(())
}
@@ -746,12 +569,10 @@ impl ExecutorManager {
pub(crate) async fn save_executor_heartbeat(
&self,
- heartbeat: protobuf::ExecutorHeartbeat,
+ heartbeat: ExecutorHeartbeat,
) -> Result<()> {
- let executor_id = heartbeat.executor_id.clone();
- let value = encode_protobuf(&heartbeat)?;
- self.state
- .put(Keyspace::Heartbeats, executor_id, value)
+ self.cluster_state
+ .save_executor_heartbeat(heartbeat.clone())
.await?;
self.executors_heartbeat
@@ -760,45 +581,21 @@ impl ExecutorManager {
Ok(())
}
- pub(crate) async fn save_dead_executor_heartbeat(
- &self,
- heartbeat: protobuf::ExecutorHeartbeat,
- ) -> Result<()> {
- let executor_id = heartbeat.executor_id.clone();
- let value = encode_protobuf(&heartbeat)?;
- self.state
- .put(Keyspace::Heartbeats, executor_id.clone(), value)
- .await?;
-
- self.executors_heartbeat
- .remove(&heartbeat.executor_id.clone());
-
- // Remove executor data cache for dead executors
- {
- let mut executor_data = self.executor_data.lock();
- executor_data.remove(&executor_id);
- }
-
- self.dead_executors.insert(executor_id);
- Ok(())
- }
-
pub(crate) fn is_dead_executor(&self, executor_id: &str) -> bool {
self.dead_executors.contains(executor_id)
}
/// Initialize the set of active executor heartbeats from storage
async fn init_active_executor_heartbeats(&self) -> Result<()> {
- let heartbeats = self.state.scan(Keyspace::Heartbeats, None).await?;
+ let heartbeats = self.cluster_state.executor_heartbeats().await?;
- for (_, value) in heartbeats {
- let data: protobuf::ExecutorHeartbeat = decode_protobuf(&value)?;
- let executor_id = data.executor_id.clone();
+ for (executor_id, heartbeat) in heartbeats {
+ // let data: protobuf::ExecutorHeartbeat =
decode_protobuf(&value)?;
if let Some(ExecutorStatus {
status: Some(executor_status::Status::Active(_)),
- }) = data.status
+ }) = heartbeat.status
{
- self.executors_heartbeat.insert(executor_id, data);
+ self.executors_heartbeat.insert(executor_id, heartbeat);
}
}
Ok(())
@@ -851,65 +648,10 @@ impl ExecutorManager {
}
}
-/// Rather than doing a scan across persistent state to find alive executors
every time
-/// we need to find the set of alive executors, we start a watch on the
`Heartbeats` keyspace
-/// and maintain an in-memory copy of the executor heartbeats.
-struct ExecutorHeartbeatListener {
- state: Arc<dyn StateBackendClient>,
- executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
- dead_executors: Arc<DashSet<String>>,
-}
-
-impl ExecutorHeartbeatListener {
- pub fn new(
- state: Arc<dyn StateBackendClient>,
- executors_heartbeat: Arc<DashMap<String, protobuf::ExecutorHeartbeat>>,
- dead_executors: Arc<DashSet<String>>,
- ) -> Self {
- Self {
- state,
- executors_heartbeat,
- dead_executors,
- }
- }
-
- /// Spawn an sync task which will watch the the Heartbeats keyspace and
insert
- /// new heartbeats in the `executors_heartbeat` cache.
- pub async fn start(&self) -> Result<()> {
- let mut watch = self
- .state
- .watch(Keyspace::Heartbeats, "".to_owned())
- .await?;
- let heartbeats = self.executors_heartbeat.clone();
- let dead_executors = self.dead_executors.clone();
- tokio::task::spawn(async move {
- while let Some(event) = watch.next().await {
- if let WatchEvent::Put(_, value) = event {
- if let Ok(data) =
- decode_protobuf::<protobuf::ExecutorHeartbeat>(&value)
- {
- let executor_id = data.executor_id.clone();
- // Remove dead executors
- if let Some(ExecutorStatus {
- status: Some(executor_status::Status::Dead(_)),
- }) = data.status
- {
- heartbeats.remove(&executor_id);
- dead_executors.insert(executor_id);
- } else {
- heartbeats.insert(executor_id, data);
- }
- }
- }
- }
- });
- Ok(())
- }
-}
-
#[cfg(test)]
mod test {
use crate::config::SlotsPolicy;
+ use crate::state::backend::cluster::DefaultClusterState;
use crate::state::backend::sled::SledClient;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use ballista_core::error::Result;
@@ -928,9 +670,11 @@ mod test {
}
async fn test_reserve_and_cancel_inner(slots_policy: SlotsPolicy) ->
Result<()> {
- let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+ SledClient::try_new_temporary()?,
+ )));
- let executor_manager = ExecutorManager::new(state_storage,
slots_policy);
+ let executor_manager = ExecutorManager::new(cluster_state,
slots_policy);
let executors = test_executors(10, 4);
@@ -943,7 +687,12 @@ mod test {
// Reserve all the slots
let reservations = executor_manager.reserve_slots(40).await?;
- assert_eq!(reservations.len(), 40);
+ assert_eq!(
+ reservations.len(),
+ 40,
+ "Expected 40 reservations for policy {:?}",
+ slots_policy
+ );
// Now cancel them
executor_manager.cancel_reservations(reservations).await?;
@@ -951,7 +700,12 @@ mod test {
// Now reserve again
let reservations = executor_manager.reserve_slots(40).await?;
- assert_eq!(reservations.len(), 40);
+ assert_eq!(
+ reservations.len(),
+ 40,
+ "Expected 40 reservations for policy {:?}",
+ slots_policy
+ );
Ok(())
}
@@ -966,9 +720,11 @@ mod test {
}
async fn test_reserve_partial_inner(slots_policy: SlotsPolicy) ->
Result<()> {
- let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+ SledClient::try_new_temporary()?,
+ )));
- let executor_manager = ExecutorManager::new(state_storage,
slots_policy);
+ let executor_manager = ExecutorManager::new(cluster_state,
slots_policy);
let executors = test_executors(10, 4);
@@ -1021,9 +777,11 @@ mod test {
let executors = test_executors(10, 4);
- let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+ SledClient::try_new_temporary()?,
+ )));
- let executor_manager = ExecutorManager::new(state_storage,
slots_policy);
+ let executor_manager = ExecutorManager::new(cluster_state,
slots_policy);
for (executor_metadata, executor_data) in executors {
executor_manager
@@ -1066,9 +824,11 @@ mod test {
}
async fn test_register_reserve_inner(slots_policy: SlotsPolicy) ->
Result<()> {
- let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state = Arc::new(DefaultClusterState::new(Arc::new(
+ SledClient::try_new_temporary()?,
+ )));
- let executor_manager = ExecutorManager::new(state_storage,
slots_policy);
+ let executor_manager = ExecutorManager::new(cluster_state,
slots_policy);
let executors = test_executors(10, 4);
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 0671fdbe..99791457 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -33,6 +33,7 @@ use crate::state::task_manager::{TaskLauncher, TaskManager};
use crate::config::SchedulerConfig;
use crate::state::execution_graph::TaskDescription;
+use backend::cluster::ClusterState;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
@@ -99,11 +100,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
#[cfg(test)]
pub fn new_with_default_scheduler_name(
config_client: Arc<dyn StateBackendClient>,
+ cluster_state: Arc<dyn ClusterState>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
) -> Self {
SchedulerState::new(
config_client,
+ cluster_state,
session_builder,
codec,
"localhost:50050".to_owned(),
@@ -113,6 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
pub fn new(
config_client: Arc<dyn StateBackendClient>,
+ cluster_state: Arc<dyn ClusterState>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_name: String,
@@ -120,7 +124,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
) -> Self {
Self {
executor_manager: ExecutorManager::new(
- config_client.clone(),
+ cluster_state,
config.executor_slots_policy,
),
task_manager: TaskManager::new(
@@ -138,6 +142,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
#[allow(dead_code)]
pub(crate) fn with_task_launcher(
config_client: Arc<dyn StateBackendClient>,
+ cluster_state: Arc<dyn ClusterState>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_name: String,
@@ -146,7 +151,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
) -> Self {
Self {
executor_manager: ExecutorManager::new(
- config_client.clone(),
+ cluster_state,
config.executor_slots_policy,
),
task_manager: TaskManager::with_launcher(
@@ -461,6 +466,7 @@ mod test {
use ballista_core::utils::default_session_builder;
use crate::config::SchedulerConfig;
+ use crate::state::backend::cluster::DefaultClusterState;
use crate::test_utils::BlackholeTaskLauncher;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum};
@@ -474,9 +480,11 @@ mod test {
#[tokio::test]
async fn test_offer_free_reservations() -> Result<()> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::new_with_default_scheduler_name(
state_storage,
+ cluster_state,
default_session_builder,
BallistaCodec::default(),
));
@@ -510,9 +518,11 @@ mod test {
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
.build()?;
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::with_task_launcher(
state_storage,
+ cluster_state,
default_session_builder,
BallistaCodec::default(),
String::default(),
@@ -595,9 +605,11 @@ mod test {
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "4")
.build()?;
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let state: Arc<SchedulerState<LogicalPlanNode, PhysicalPlanNode>> =
Arc::new(SchedulerState::with_task_launcher(
state_storage,
+ cluster_state,
default_session_builder,
BallistaCodec::default(),
String::default(),
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index ae386382..3f98e370 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -51,6 +51,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::CsvReadOptions;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::state::backend::cluster::DefaultClusterState;
use datafusion_proto::protobuf::LogicalPlanNode;
use parking_lot::Mutex;
use tokio::sync::mpsc::{channel, Receiver, Sender};
@@ -381,6 +382,7 @@ impl SchedulerTest {
runner: Option<Arc<dyn TaskRunner>>,
) -> Result<Self> {
let state_storage = Arc::new(SledClient::try_new_temporary()?);
+ let cluster_state =
Arc::new(DefaultClusterState::new(state_storage.clone()));
let ballista_config = BallistaConfig::builder()
.set(
@@ -415,7 +417,8 @@ impl SchedulerTest {
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
SchedulerServer::with_task_launcher(
"localhost:50050".to_owned(),
- state_storage.clone(),
+ state_storage,
+ cluster_state,
BallistaCodec::default(),
config,
metrics_collector,