This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 926605e7 Add SchedulerConfig for the scheduler configurations, like
event_loop_buffer_size, finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds (#472)
926605e7 is described below
commit 926605e7a135fb74016392cedf94be519c9ba5c1
Author: yahoNanJing <[email protected]>
AuthorDate: Thu Nov 3 07:46:16 2022 +0800
Add SchedulerConfig for the scheduler configurations, like
event_loop_buffer_size, finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds (#472)
* Move data cleanup caller explicitly to the event loop
* Refactor BallistaConfig by extracting common validation logic
* Create a separate mod for BallistaConfig
* Add SchedulerConfig for the scheduler configurations, like
event_loop_buffer_size, finished_job_data_clean_up_interval_seconds,
finished_job_state_clean_up_interval_seconds
* Rename the scheduler config advertise_endpoint to
advertise_flight_result_route_endpoint and add it to the SchedulerConfig
* Allow redundant configurations in ValidConfiguration
* Don't need to be delayed for cleaning up shuffle data of failed job
* Update user-guide for the scheduler configurations
* Fix doc config name format
* Change the SchedulerConfig to be an explicit struct
* Fix doc format
* Rename advertise-flight-result-route-endpoint to
advertise-flight-sql-endpoint
Co-authored-by: yangzhong <[email protected]>
---
ballista/scheduler/scheduler_config_spec.toml | 14 ++-
ballista/scheduler/src/config.rs | 71 ++++++++++++
ballista/scheduler/src/flight_sql.rs | 8 +-
ballista/scheduler/src/main.rs | 61 ++++-------
ballista/scheduler/src/scheduler_server/grpc.rs | 16 ++-
ballista/scheduler/src/scheduler_server/mod.rs | 121 ++++-----------------
.../src/scheduler_server/query_stage_scheduler.rs | 63 ++---------
ballista/scheduler/src/standalone.rs | 4 +-
ballista/scheduler/src/state/executor_manager.rs | 45 ++++++--
ballista/scheduler/src/state/mod.rs | 36 +++++-
ballista/scheduler/src/state/task_manager.rs | 101 ++++++++---------
docs/source/user-guide/configs.md | 35 +++++-
12 files changed, 295 insertions(+), 280 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 52d2ee5c..4a4e5389 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -25,7 +25,7 @@ name = "version"
doc = "Print version of this executable"
[[param]]
-name = "advertise_endpoint"
+name = "advertise_flight_sql_endpoint"
type = "String"
doc = "Route for proxying flight results via scheduler. Should be of the form
'IP:PORT'"
@@ -83,6 +83,18 @@ type = "u32"
default = "10000"
doc = "Event loop buffer size. Default: 10000"
+[[param]]
+name = "finished_job_data_clean_up_interval_seconds"
+type = "u64"
+default = "300"
+doc = "Delayed interval for cleaning up finished job data. Default: 300"
+
+[[param]]
+name = "finished_job_state_clean_up_interval_seconds"
+type = "u64"
+default = "3600"
+doc = "Delayed interval for cleaning up finished job state. Default: 3600"
+
[[param]]
name = "executor_slots_policy"
type = "ballista_scheduler::config::SlotsPolicy"
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 2cb940c9..8ef33219 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -18,9 +18,80 @@
//! Ballista scheduler specific configuration
+use ballista_core::config::TaskSchedulingPolicy;
use clap::ArgEnum;
use std::fmt;
+/// Configurations for the ballista scheduler of scheduling jobs and tasks
+#[derive(Debug, Clone)]
+pub struct SchedulerConfig {
+ /// The task scheduling policy for the scheduler
+ pub scheduling_policy: TaskSchedulingPolicy,
+ /// The event loop buffer size. for a system of high throughput, a larger
value like 1000000 is recommended
+ pub event_loop_buffer_size: u32,
+ /// The executor slots policy for the scheduler. For a cluster with single
scheduler, round-robin-local is recommended
+ pub executor_slots_policy: SlotsPolicy,
+ /// The delayed interval for cleaning up finished job data, mainly the
shuffle data, 0 means the cleaning up is disabled
+ pub finished_job_data_clean_up_interval_seconds: u64,
+ /// The delayed interval for cleaning up finished job state stored in the
backend, 0 means the cleaning up is disabled.
+ pub finished_job_state_clean_up_interval_seconds: u64,
+ /// The route endpoint for proxying flight sql results via scheduler
+ pub advertise_flight_sql_endpoint: Option<String>,
+}
+
+impl Default for SchedulerConfig {
+ fn default() -> Self {
+ Self {
+ scheduling_policy: TaskSchedulingPolicy::PullStaged,
+ event_loop_buffer_size: 10000,
+ executor_slots_policy: SlotsPolicy::Bias,
+ finished_job_data_clean_up_interval_seconds: 300,
+ finished_job_state_clean_up_interval_seconds: 3600,
+ advertise_flight_sql_endpoint: None,
+ }
+ }
+}
+
+impl SchedulerConfig {
+ pub fn is_push_staged_scheduling(&self) -> bool {
+ matches!(self.scheduling_policy, TaskSchedulingPolicy::PushStaged)
+ }
+
+ pub fn with_scheduler_policy(mut self, policy: TaskSchedulingPolicy) ->
Self {
+ self.scheduling_policy = policy;
+ self
+ }
+
+ pub fn with_event_loop_buffer_size(mut self, buffer_size: u32) -> Self {
+ self.event_loop_buffer_size = buffer_size;
+ self
+ }
+
+ pub fn with_finished_job_data_clean_up_interval_seconds(
+ mut self,
+ interval_seconds: u64,
+ ) -> Self {
+ self.finished_job_data_clean_up_interval_seconds = interval_seconds;
+ self
+ }
+
+ pub fn with_finished_job_state_clean_up_interval_seconds(
+ mut self,
+ interval_seconds: u64,
+ ) -> Self {
+ self.finished_job_state_clean_up_interval_seconds = interval_seconds;
+ self
+ }
+
+ pub fn with_advertise_flight_sql_endpoint(
+ mut self,
+ endpoint: Option<String>,
+ ) -> Self {
+ self.advertise_flight_sql_endpoint = endpoint;
+ self
+ }
+}
+
// an enum used to configure the executor slots policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index 2fb6e973..a3c706ef 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -64,7 +64,6 @@ use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::prelude::SessionContext;
use datafusion_proto::protobuf::LogicalPlanNode;
-use itertools::Itertools;
use prost::Message;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;
@@ -243,7 +242,12 @@ impl FlightSqlServiceImpl {
))?
};
- let (host, port) = match &self.server.advertise_endpoint {
+ let (host, port) = match &self
+ .server
+ .state
+ .config
+ .advertise_flight_sql_endpoint
+ {
Some(endpoint) => {
let advertise_endpoint_vec: Vec<&str> =
endpoint.split(":").collect();
match advertise_endpoint_vec.as_slice() {
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index b8eac568..692ba14c 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -43,7 +43,6 @@ use datafusion_proto::protobuf::LogicalPlanNode;
use ballista_scheduler::scheduler_server::SchedulerServer;
use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};
-use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;
use log::info;
@@ -62,7 +61,8 @@ mod config {
}
use ballista_core::utils::create_grpc_server;
-use ballista_scheduler::config::SlotsPolicy;
+
+use ballista_scheduler::config::SchedulerConfig;
#[cfg(feature = "flight-sql")]
use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
use config::prelude::*;
@@ -72,10 +72,7 @@ async fn start_server(
scheduler_name: String,
config_backend: Arc<dyn StateBackendClient>,
addr: SocketAddr,
- scheduling_policy: TaskSchedulingPolicy,
- slots_policy: SlotsPolicy,
- event_loop_buffer_size: usize,
- advertise_endpoint: Option<String>,
+ config: SchedulerConfig,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
@@ -84,28 +81,16 @@ async fn start_server(
// Should only call SchedulerServer::new() once in the process
info!(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
- scheduling_policy
+ config.scheduling_policy
);
let mut scheduler_server: SchedulerServer<LogicalPlanNode,
PhysicalPlanNode> =
- match scheduling_policy {
- TaskSchedulingPolicy::PushStaged =>
SchedulerServer::new_with_policy(
- scheduler_name,
- config_backend.clone(),
- scheduling_policy,
- slots_policy,
- BallistaCodec::default(),
- event_loop_buffer_size,
- advertise_endpoint,
- ),
- _ => SchedulerServer::new(
- scheduler_name,
- config_backend.clone(),
- BallistaCodec::default(),
- event_loop_buffer_size,
- advertise_endpoint,
- ),
- };
+ SchedulerServer::new(
+ scheduler_name,
+ config_backend.clone(),
+ BallistaCodec::default(),
+ config,
+ );
scheduler_server.init().await?;
@@ -207,7 +192,7 @@ async fn main() -> Result<()> {
let addr = format!("{}:{}", bind_host, port);
let addr = addr.parse()?;
- let client: Arc<dyn StateBackendClient> = match opt.config_backend {
+ let config_backend: Arc<dyn StateBackendClient> = match opt.config_backend
{
#[cfg(not(any(feature = "sled", feature = "etcd")))]
_ => std::compile_error!(
"To build the scheduler enable at least one config backend feature
(`etcd` or `sled`)"
@@ -248,18 +233,16 @@ async fn main() -> Result<()> {
}
};
- let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
- let slots_policy: SlotsPolicy = opt.executor_slots_policy;
- let event_loop_buffer_size = opt.event_loop_buffer_size as usize;
- start_server(
- scheduler_name,
- client,
- addr,
- scheduling_policy,
- slots_policy,
- event_loop_buffer_size,
- opt.advertise_endpoint,
- )
- .await?;
+ let config = SchedulerConfig {
+ scheduling_policy: opt.scheduler_policy,
+ event_loop_buffer_size: opt.event_loop_buffer_size,
+ executor_slots_policy: opt.executor_slots_policy,
+ finished_job_data_clean_up_interval_seconds: opt
+ .finished_job_data_clean_up_interval_seconds,
+ finished_job_state_clean_up_interval_seconds: opt
+ .finished_job_state_clean_up_interval_seconds,
+ advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
+ };
+ start_server(scheduler_name, config_backend, addr, config).await?;
Ok(())
}
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index f4c75da8..73a1c64f 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy,
BALLISTA_JOB_NAME};
+use ballista_core::config::{BallistaConfig, BALLISTA_JOB_NAME};
use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId,
Query};
use std::convert::TryInto;
@@ -58,7 +58,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
&self,
request: Request<PollWorkParams>,
) -> Result<Response<PollWorkResult>, Status> {
- if let TaskSchedulingPolicy::PushStaged = self.policy {
+ if self.state.config.is_push_staged_scheduling() {
error!("Poll work interface is not supported for push-based task
scheduling");
return Err(tonic::Status::failed_precondition(
"Bad request because poll work is not supported for push-based
task scheduling",
@@ -207,7 +207,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
// If we are using push-based scheduling then reserve this
executors slots and send
// them for scheduling tasks.
- if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
+ if self.state.config.is_push_staged_scheduling() {
self.offer_reservation(reservations).await?;
}
@@ -553,6 +553,7 @@ mod test {
use datafusion_proto::protobuf::LogicalPlanNode;
use tonic::Request;
+ use crate::config::SchedulerConfig;
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, executor_status,
ExecutorRegistration,
@@ -576,8 +577,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
- 10000,
- None,
+ SchedulerConfig::default(),
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
@@ -663,8 +663,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
- 10000,
- None,
+ SchedulerConfig::default(),
);
scheduler.init().await?;
@@ -744,8 +743,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
- 10000,
- None,
+ SchedulerConfig::default(),
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 079654c1..f81135ae 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
-use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
use ballista_core::event_loop::{EventLoop, EventSender};
use ballista_core::serde::protobuf::{StopExecutorParams, TaskStatus};
@@ -30,7 +29,7 @@ use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::logical_plan::AsLogicalPlan;
-use crate::config::SlotsPolicy;
+use crate::config::SchedulerConfig;
use log::{error, warn};
use crate::scheduler_server::event::QueryStageSchedulerEvent;
@@ -57,113 +56,40 @@ pub(crate) type SessionBuilder = fn(SessionConfig) ->
SessionState;
#[derive(Clone)]
pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> {
pub scheduler_name: String,
- pub advertise_endpoint: Option<String>,
- pub(crate) state: Arc<SchedulerState<T, U>>,
pub start_time: u128,
- policy: TaskSchedulingPolicy,
+ pub(crate) state: Arc<SchedulerState<T, U>>,
pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerServer<T, U> {
pub fn new(
scheduler_name: String,
- config: Arc<dyn StateBackendClient>,
+ config_backend: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
- event_loop_buffer_size: usize,
- advertise_endpoint: Option<String>,
+ config: SchedulerConfig,
) -> Self {
let state = Arc::new(SchedulerState::new(
- config,
+ config_backend,
default_session_builder,
codec,
scheduler_name.clone(),
- SlotsPolicy::Bias,
+ config.clone(),
));
-
- SchedulerServer::new_with_state(
- scheduler_name,
- TaskSchedulingPolicy::PullStaged,
- state,
- event_loop_buffer_size,
- advertise_endpoint,
- )
- }
-
- pub fn new_with_builder(
- scheduler_name: String,
- config: Arc<dyn StateBackendClient>,
- codec: BallistaCodec<T, U>,
- session_builder: SessionBuilder,
- event_loop_buffer_size: usize,
- advertise_endpoint: Option<String>,
- ) -> Self {
- let state = Arc::new(SchedulerState::new(
- config,
- session_builder,
- codec,
- scheduler_name.clone(),
- SlotsPolicy::Bias,
- ));
-
- SchedulerServer::new_with_state(
- scheduler_name,
- TaskSchedulingPolicy::PullStaged,
- state,
- event_loop_buffer_size,
- advertise_endpoint,
- )
- }
-
- pub fn new_with_policy(
- scheduler_name: String,
- config: Arc<dyn StateBackendClient>,
- scheduling_policy: TaskSchedulingPolicy,
- slots_policy: SlotsPolicy,
- codec: BallistaCodec<T, U>,
- event_loop_buffer_size: usize,
- advertise_endpoint: Option<String>,
- ) -> Self {
- let state = Arc::new(SchedulerState::new(
- config,
- default_session_builder,
- codec,
- scheduler_name.clone(),
- slots_policy,
- ));
-
- SchedulerServer::new_with_state(
- scheduler_name,
- scheduling_policy,
- state,
- event_loop_buffer_size,
- advertise_endpoint,
- )
- }
-
- pub(crate) fn new_with_state(
- scheduler_name: String,
- policy: TaskSchedulingPolicy,
- state: Arc<SchedulerState<T, U>>,
- event_loop_buffer_size: usize,
- advertise_endpoint: Option<String>,
- ) -> Self {
- let query_stage_scheduler =
- Arc::new(QueryStageScheduler::new(state.clone(), policy));
+ let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone()));
let query_stage_event_loop = EventLoop::new(
"query_stage".to_owned(),
- event_loop_buffer_size,
+ config.event_loop_buffer_size as usize,
query_stage_scheduler,
);
+
Self {
scheduler_name,
- state,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
- policy,
+ state,
query_stage_event_loop,
- advertise_endpoint,
}
}
@@ -331,7 +257,7 @@ mod test {
};
use ballista_core::error::Result;
- use crate::config::SlotsPolicy;
+ use crate::config::SchedulerConfig;
use ballista_core::serde::protobuf::{
failed_task, job_status, task_status, ExecutionError, FailedTask,
JobStatus,
PhysicalPlanNode, ShuffleWritePartition, SuccessfulTask, TaskStatus,
@@ -340,13 +266,11 @@ mod test {
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
use ballista_core::serde::BallistaCodec;
- use ballista_core::utils::default_session_builder;
use crate::scheduler_server::SchedulerServer;
use crate::state::backend::standalone::StandaloneClient;
use crate::state::executor_manager::ExecutorReservation;
- use crate::state::SchedulerState;
use crate::test_utils::{await_condition, ExplodingTableProvider};
#[tokio::test]
@@ -795,14 +719,11 @@ mod test {
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
- SchedulerServer::new_with_policy(
+ SchedulerServer::new(
"localhost:50050".to_owned(),
state_storage.clone(),
- scheduling_policy,
- SlotsPolicy::Bias,
BallistaCodec::default(),
- 10000,
- None,
+
SchedulerConfig::default().with_scheduler_policy(scheduling_policy),
);
scheduler.init().await?;
@@ -812,18 +733,14 @@ mod test {
async fn test_push_staged_scheduler(
) -> Result<SchedulerServer<LogicalPlanNode, PhysicalPlanNode>> {
let state_storage = Arc::new(StandaloneClient::try_new_temporary()?);
- let state = Arc::new(SchedulerState::new_with_default_scheduler_name(
- state_storage,
- default_session_builder,
- BallistaCodec::default(),
- ));
+ let config = SchedulerConfig::default()
+ .with_scheduler_policy(TaskSchedulingPolicy::PushStaged);
let mut scheduler: SchedulerServer<LogicalPlanNode, PhysicalPlanNode> =
- SchedulerServer::new_with_state(
+ SchedulerServer::new(
"localhost:50050".to_owned(),
- TaskSchedulingPolicy::PushStaged,
- state,
- 10000,
- None,
+ state_storage,
+ BallistaCodec::default(),
+ config,
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index ee190170..5c31bdaa 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -16,7 +16,6 @@
// under the License.
use std::sync::Arc;
-use std::time::Duration;
use async_trait::async_trait;
use log::{debug, error, info};
@@ -24,7 +23,6 @@ use log::{debug, error, info};
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::{EventAction, EventSender};
-use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::AsExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
use tokio::sync::mpsc;
@@ -34,24 +32,16 @@ use
crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::state::executor_manager::ExecutorReservation;
use crate::state::SchedulerState;
-// TODO move to configuration file
-/// Clean up job data interval
-pub const CLEANUP_FINISHED_JOB_DELAY_SECS: u64 = 300;
-
pub(crate) struct QueryStageScheduler<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
> {
state: Arc<SchedulerState<T, U>>,
- policy: TaskSchedulingPolicy,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
QueryStageScheduler<T, U> {
- pub(crate) fn new(
- state: Arc<SchedulerState<T, U>>,
- policy: TaskSchedulingPolicy,
- ) -> Self {
- Self { state, policy }
+ pub(crate) fn new(state: Arc<SchedulerState<T, U>>) -> Self {
+ Self { state }
}
}
@@ -103,7 +93,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
}
QueryStageSchedulerEvent::JobSubmitted(job_id) => {
info!("Job {} submitted", job_id);
- if matches!(self.policy, TaskSchedulingPolicy::PushStaged) {
+ if self.state.config.is_push_staged_scheduling() {
let available_tasks = self
.state
.task_manager
@@ -136,66 +126,35 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
error!("Job {} failed: {}", job_id, failure_reason);
self.state
.task_manager
- .fail_unscheduled_job(
- &job_id,
- failure_reason,
- CLEANUP_FINISHED_JOB_DELAY_SECS,
- )
+ .fail_unscheduled_job(&job_id, failure_reason)
.await?;
}
QueryStageSchedulerEvent::JobFinished(job_id) => {
info!("Job {} success", job_id);
- self.state
- .task_manager
- .succeed_job(&job_id, CLEANUP_FINISHED_JOB_DELAY_SECS)
- .await?;
- let executor_manager = self.state.executor_manager.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(
- CLEANUP_FINISHED_JOB_DELAY_SECS,
- ))
- .await;
- executor_manager.clean_up_job_data(job_id).await;
- });
+ self.state.task_manager.succeed_job(&job_id).await?;
+ self.state.clean_up_successful_job(job_id);
}
QueryStageSchedulerEvent::JobRunningFailed(job_id, failure_reason)
=> {
error!("Job {} running failed", job_id);
let tasks = self
.state
.task_manager
- .abort_job(&job_id, failure_reason,
CLEANUP_FINISHED_JOB_DELAY_SECS)
+ .abort_job(&job_id, failure_reason)
.await?;
if !tasks.is_empty() {
tx_event
.post_event(QueryStageSchedulerEvent::CancelTasks(tasks))
.await?;
}
- let executor_manager = self.state.executor_manager.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(
- CLEANUP_FINISHED_JOB_DELAY_SECS,
- ))
- .await;
- executor_manager.clean_up_job_data(job_id).await;
- });
+ self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::JobUpdated(job_id) => {
info!("Job {} Updated", job_id);
self.state.task_manager.update_job(&job_id).await?;
}
QueryStageSchedulerEvent::JobCancel(job_id) => {
- self.state
- .task_manager
- .cancel_job(&job_id, CLEANUP_FINISHED_JOB_DELAY_SECS)
- .await?;
- let executor_manager = self.state.executor_manager.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(
- CLEANUP_FINISHED_JOB_DELAY_SECS,
- ))
- .await;
- executor_manager.clean_up_job_data(job_id).await;
- });
+ self.state.task_manager.cancel_job(&job_id).await?;
+ self.state.clean_up_failed_job(job_id);
}
QueryStageSchedulerEvent::TaskUpdating(executor_id, tasks_status)
=> {
let num_status = tasks_status.len();
@@ -205,7 +164,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
.await
{
Ok((stage_events, offers)) => {
- if matches!(self.policy,
TaskSchedulingPolicy::PushStaged) {
+ if self.state.config.is_push_staged_scheduling() {
tx_event
.post_event(
QueryStageSchedulerEvent::ReservationOffering(offers),
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index dec6acef..eb64cfb7 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::config::SchedulerConfig;
use crate::{
scheduler_server::SchedulerServer,
state::backend::standalone::StandaloneClient,
};
@@ -38,8 +39,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr>
{
"localhost:50050".to_owned(),
Arc::new(client),
BallistaCodec::default(),
- 10000,
- None,
+ SchedulerConfig::default(),
);
scheduler_server.init().await?;
let server = SchedulerGrpcServer::new(scheduler_server.clone());
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 8fd776b3..d86674f1 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -483,25 +483,52 @@ impl ExecutorManager {
Ok(())
}
+ /// Send rpc to Executors to clean up the job data by delayed
clean_up_interval seconds
+ pub(crate) fn clean_up_job_data_delayed(
+ &self,
+ job_id: String,
+ clean_up_interval: u64,
+ ) {
+ if clean_up_interval == 0 {
+ info!(
+ "The interval is 0 and the clean up for job data {} will not
triggered",
+ job_id
+ );
+ return;
+ }
+
+ let executor_manager = self.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
+ executor_manager.clean_up_job_data_inner(job_id).await;
+ });
+ }
+
+ /// Send rpc to Executors to clean up the job data in a spawn thread
+ pub fn clean_up_job_data(&self, job_id: String) {
+ let executor_manager = self.clone();
+ tokio::spawn(async move {
+ executor_manager.clean_up_job_data_inner(job_id).await;
+ });
+ }
+
/// Send rpc to Executors to clean up the job data
- pub async fn clean_up_job_data(&self, job_id: String) {
+ async fn clean_up_job_data_inner(&self, job_id: String) {
let alive_executors = self.get_alive_executors_within_one_minute();
for executor in alive_executors {
let job_id_clone = job_id.to_owned();
if let Ok(mut client) = self.get_client(&executor).await {
tokio::spawn(async move {
+ if let Err(err) = client
+ .remove_job_data(RemoveJobDataParams {
+ job_id: job_id_clone,
+ })
+ .await
{
- if let Err(err) = client
- .remove_job_data(RemoveJobDataParams {
- job_id: job_id_clone,
- })
- .await
- {
- warn!(
+ warn!(
"Failed to call remove_job_data on Executor {} due
to {:?}",
executor, err
)
- }
}
});
} else {
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index de20dcdd..d770f984 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -31,7 +31,7 @@ use crate::state::executor_manager::{ExecutorManager,
ExecutorReservation};
use crate::state::session_manager::SessionManager;
use crate::state::task_manager::TaskManager;
-use crate::config::SlotsPolicy;
+use crate::config::SchedulerConfig;
use crate::state::execution_graph::TaskDescription;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
@@ -92,6 +92,7 @@ pub(super) struct SchedulerState<T: 'static + AsLogicalPlan,
U: 'static + AsExec
pub task_manager: TaskManager<T, U>,
pub session_manager: SessionManager,
pub codec: BallistaCodec<T, U>,
+ pub config: SchedulerConfig,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerState<T, U> {
@@ -106,7 +107,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
session_builder,
codec,
"localhost:50050".to_owned(),
- SlotsPolicy::Bias,
+ SchedulerConfig::default(),
)
}
@@ -115,10 +116,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_name: String,
- slots_policy: SlotsPolicy,
+ config: SchedulerConfig,
) -> Self {
Self {
- executor_manager: ExecutorManager::new(config_client.clone(),
slots_policy),
+ executor_manager: ExecutorManager::new(
+ config_client.clone(),
+ config.executor_slots_policy,
+ ),
task_manager: TaskManager::new(
config_client.clone(),
session_builder,
@@ -127,6 +131,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
),
session_manager: SessionManager::new(config_client,
session_builder),
codec,
+ config,
}
}
@@ -385,7 +390,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
pub(crate) async fn cancel_job(&self, job_id: &str) -> Result<bool> {
info!("Received cancellation request for job {}", job_id);
- match self.task_manager.cancel_job(job_id, 300).await {
+ match self.task_manager.cancel_job(job_id).await {
Ok(tasks) => {
self.executor_manager.cancel_running_tasks(tasks).await.map_err(|e| {
let msg = format!("Error to cancel running tasks when
cancelling job {} due to {:?}", job_id, e);
@@ -401,6 +406,27 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
}
}
}
+
+ /// Spawn a delayed future to clean up job data on both Scheduler and
Executors
+ pub(crate) fn clean_up_successful_job(&self, job_id: String) {
+ self.executor_manager.clean_up_job_data_delayed(
+ job_id.clone(),
+ self.config.finished_job_data_clean_up_interval_seconds,
+ );
+ self.task_manager.delete_successful_job_delayed(
+ job_id,
+ self.config.finished_job_state_clean_up_interval_seconds,
+ );
+ }
+
+ /// Spawn a delayed future to clean up job data on both Scheduler and
Executors
+ pub(crate) fn clean_up_failed_job(&self, job_id: String) {
+ self.executor_manager.clean_up_job_data(job_id.clone());
+ self.task_manager.clean_up_failed_job_delayed(
+ job_id,
+ self.config.finished_job_state_clean_up_interval_seconds,
+ );
+ }
}
pub async fn with_lock<Out, F: Future<Output = Out>>(
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index de55fd4c..31b28100 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -27,6 +27,7 @@ use ballista_core::config::BallistaConfig;
use ballista_core::error::BallistaError;
use ballista_core::error::Result;
+use crate::state::backend::Keyspace::{CompletedJobs, FailedJobs};
use crate::state::session_manager::create_datafusion_context;
use ballista_core::serde::protobuf::{
self, job_status, FailedJob, JobStatus, MultiTaskDefinition,
TaskDefinition, TaskId,
@@ -310,11 +311,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
/// Mark a job to success. This will create a key under the CompletedJobs
keyspace
/// and remove the job from ActiveJobs
- pub(crate) async fn succeed_job(
- &self,
- job_id: &str,
- clean_up_interval: u64,
- ) -> Result<()> {
+ pub(crate) async fn succeed_job(&self, job_id: &str) -> Result<()> {
debug!("Moving job {} from Active to Success", job_id);
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
with_lock(lock, self.state.delete(Keyspace::ActiveJobs,
job_id)).await?;
@@ -334,26 +331,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
warn!("Fail to find job {} in the cache", job_id);
}
- // spawn a delayed future to clean up job data on both Scheduler and
Executors
- let state = self.state.clone();
- let job_id_str = job_id.to_owned();
- let active_job_cache = self.active_job_cache.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
- Self::clean_up_job_data(state, active_job_cache, false,
job_id_str).await
- });
-
Ok(())
}
/// Cancel the job and return a Vec of running tasks need to cancel
- pub(crate) async fn cancel_job(
- &self,
- job_id: &str,
- clean_up_interval_in_secs: u64,
- ) -> Result<Vec<RunningTaskInfo>> {
- self.abort_job(job_id, "Cancelled".to_owned(),
clean_up_interval_in_secs)
- .await
+ pub(crate) async fn cancel_job(&self, job_id: &str) ->
Result<Vec<RunningTaskInfo>> {
+ self.abort_job(job_id, "Cancelled".to_owned()).await
}
/// Abort the job and return a Vec of running tasks need to cancel
@@ -361,7 +344,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
&self,
job_id: &str,
failure_reason: String,
- clean_up_interval_in_secs: u64,
) -> Result<Vec<RunningTaskInfo>> {
let locks = self
.state
@@ -388,15 +370,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
vec![]
};
- // spawn a delayed future to clean up job data on both Scheduler and
Executors
- let state = self.state.clone();
- let job_id_str = job_id.to_owned();
- let active_job_cache = self.active_job_cache.clone();
- tokio::spawn(async move {
-
tokio::time::sleep(Duration::from_secs(clean_up_interval_in_secs)).await;
- Self::clean_up_job_data(state, active_job_cache, true,
job_id_str).await
- });
-
Ok(tasks_to_cancel)
}
@@ -406,7 +379,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
&self,
job_id: &str,
failure_reason: String,
- clean_up_interval_in_secs: u64,
) -> Result<()> {
debug!("Moving job {} from Active or Queue to Failed", job_id);
let locks = self
@@ -418,15 +390,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
.await?;
with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
- // spawn a delayed future to clean up job data on Scheduler
- let state = self.state.clone();
- let job_id_str = job_id.to_owned();
- let active_job_cache = self.active_job_cache.clone();
- tokio::spawn(async move {
-
tokio::time::sleep(Duration::from_secs(clean_up_interval_in_secs)).await;
- Self::clean_up_job_data(state, active_job_cache, true,
job_id_str).await
- });
-
Ok(())
}
@@ -821,21 +784,53 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
.collect()
}
- async fn clean_up_job_data(
- state: Arc<dyn StateBackendClient>,
- active_job_cache: ActiveJobCache,
- failed: bool,
+ /// Clean up a failed job in FailedJobs Keyspace by delayed
clean_up_interval seconds
+ pub(crate) fn clean_up_failed_job_delayed(
+ &self,
job_id: String,
- ) -> Result<()> {
- active_job_cache.remove(&job_id);
- let keyspace = if failed {
- Keyspace::FailedJobs
- } else {
- Keyspace::CompletedJobs
- };
+ clean_up_interval: u64,
+ ) {
+ if clean_up_interval == 0 {
+ info!("The interval is 0 and the clean up for the failed job state
{} will not triggered", job_id);
+ return;
+ }
+ self.delete_from_state_backend_delayed(FailedJobs, job_id,
clean_up_interval)
+ }
+
+ /// Clean up a successful job in CompletedJobs Keyspace by delayed
clean_up_interval seconds
+ pub(crate) fn delete_successful_job_delayed(
+ &self,
+ job_id: String,
+ clean_up_interval: u64,
+ ) {
+ if clean_up_interval == 0 {
+ info!("The interval is 0 and the clean up for the successful job
state {} will not triggered", job_id);
+ return;
+ }
+ self.delete_from_state_backend_delayed(CompletedJobs, job_id,
clean_up_interval)
+ }
+ /// Clean up entries in some keyspace by delayed clean_up_interval seconds
+ fn delete_from_state_backend_delayed(
+ &self,
+ keyspace: Keyspace,
+ key: String,
+ clean_up_interval: u64,
+ ) {
+ let state = self.state.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
+ Self::delete_from_state_backend(state, keyspace, &key).await
+ });
+ }
+
+ async fn delete_from_state_backend(
+ state: Arc<dyn StateBackendClient>,
+ keyspace: Keyspace,
+ key: &str,
+ ) -> Result<()> {
let lock = state.lock(keyspace.clone(), "").await?;
- with_lock(lock, state.delete(keyspace, &job_id)).await?;
+ with_lock(lock, state.delete(keyspace, key)).await?;
Ok(())
}
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 40bf55f2..d112e7c8 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -19,20 +19,22 @@
# Configuration
+## BallistaContext Configuration Settings
+
Ballista has a number of configuration settings that can be specified when
creating a BallistaContext.
_Example: Specifying configuration options when creating a context_
```rust
let config = BallistaConfig::builder()
- .set("ballista.shuffle.partitions", "200")
- .set("ballista.batch.size", "16384")
- .build()?;
+.set("ballista.shuffle.partitions", "200")
+.set("ballista.batch.size", "16384")
+.build() ?;
-let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
+let ctx = BallistaContext::remote("localhost", 50050, & config).await?;
```
-## Ballista Configuration Settings
+### Ballista Configuration Settings
| key | type | default | description
|
| --------------------------------- | ------- | ------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
@@ -46,7 +48,7 @@ let ctx = BallistaContext::remote("localhost", 50050,
&config).await?;
| ballista.with_information_schema | Boolean | true | Determines whether
the `information_schema` should be created in the context. This is necessary
for supporting DDL commands such as `SHOW TABLES`. |
| ballista.plugin_dir | Boolean | true | Specified a path for
plugin files. Dynamic library files in this directory will be loaded when
scheduler state initializes. |
-## DataFusion Configuration Settings
+### DataFusion Configuration Settings
In addition to Ballista-specific configuration settings, the following
DataFusion settings can also be specified.
@@ -58,3 +60,24 @@ In addition to Ballista-specific configuration settings, the
following DataFusio
| datafusion.explain.physical_plan_only | Boolean | false | When
set to true, the explain statement will only print physical plans.
|
| datafusion.optimizer.filter_null_join_keys | Boolean | false | When
set to true, the optimizer will insert filters before a join between a nullable
and non-nullable column to filter out nulls on the nullable side. This filter
can add additional overhead when the file format does not fully support
predicate push down.
|
| datafusion.optimizer.skip_failed_rules | Boolean | true | When
set to true, the logical plan optimizer will produce warning messages if any
optimization rules produce errors and then proceed to the next rule. When set
to false, any rules that produce errors will cause the query to fail.
|
+
+## Ballista Scheduler Configuration Settings
+
+Besides the BallistaContext configuration settings, a few configuration
settings for the Ballista scheduler to better
+manage the whole cluster are also needed to be taken care of.
+
+_Example: Specifying configuration options when starting the scheduler_
+
+```shell
+./ballista-scheduler --scheduler-policy push-staged --event-loop-buffer-size
1000000 --executor-slots-policy
+round-robin-local
+```
+
+| key | type | default |
description
|
+| -------------------------------------------- | ------ | ----------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
+| scheduler-policy | Utf8 | pull-staged | Sets
the task scheduling policy for the scheduler, possible values: pull-staged,
push-staged.
|
+| event-loop-buffer-size | UInt32 | 10000 | Sets
the event loop buffer size. for a system of high throughput, a larger value
like 1000000 is recommended.
|
+| executor-slots-policy | Utf8 | bias | Sets
the executor slots policy for the scheduler, possible values: bias,
round-robin, round-robin-local. For a cluster with single scheduler,
round-robin-local is recommended. |
+| finished-job-data-clean-up-interval-seconds | UInt64 | 300 | Sets
the delayed interval for cleaning up finished job data, mainly the shuffle
data, 0 means the cleaning up is disabled.
|
+| finished-job-state-clean-up-interval-seconds | UInt64 | 3600 | Sets
the delayed interval for cleaning up finished job state stored in the backend,
0 means the cleaning up is disabled.
|
+| advertise-flight-sql-endpoint | Utf8 | N/A | Sets
the route endpoint for proxying flight sql results via scheduler.
|