This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 5c2420c9 Make the scheduler event loop buffer size configurable (#398)
5c2420c9 is described below
commit 5c2420c9b3f3ba1b4c363b2fd93309c93ac851fd
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Oct 21 11:56:06 2022 +0800
Make the scheduler event loop buffer size configurable (#398)
* Make the scheduler event loop buffer size configurable
* Fix PR review
Co-authored-by: yangzhong <[email protected]>
---
ballista/scheduler/scheduler_config_spec.toml | 6 ++++++
ballista/scheduler/src/main.rs | 5 +++++
ballista/scheduler/src/scheduler_server/grpc.rs | 3 +++
ballista/scheduler/src/scheduler_server/mod.rs | 22 +++++++++++++++++++---
ballista/scheduler/src/standalone.rs | 1 +
5 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index 7549260e..daf7bf60 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -72,6 +72,12 @@ type = "ballista_core::config::TaskSchedulingPolicy"
doc = "The scheduing policy for the scheduler, possible values: pull-staged,
push-staged. Default: pull-staged"
default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
+[[param]]
+name = "event_loop_buffer_size"
+type = "u32"
+default = "10000"
+doc = "Event loop buffer size. Default: 10000"
+
[[param]]
name = "executor_slots_policy"
type = "ballista_scheduler::config::SlotsPolicy"
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index fafdfa7a..0a0c4faf 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -74,6 +74,7 @@ async fn start_server(
addr: SocketAddr,
scheduling_policy: TaskSchedulingPolicy,
slots_policy: SlotsPolicy,
+ event_loop_buffer_size: usize,
) -> Result<()> {
info!(
"Ballista v{} Scheduler listening on {:?}",
@@ -93,11 +94,13 @@ async fn start_server(
slots_policy,
BallistaCodec::default(),
default_session_builder,
+ event_loop_buffer_size,
),
_ => SchedulerServer::new(
scheduler_name,
config_backend.clone(),
BallistaCodec::default(),
+ event_loop_buffer_size,
),
};
@@ -244,12 +247,14 @@ async fn main() -> Result<()> {
let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
let slots_policy: SlotsPolicy = opt.executor_slots_policy;
+ let event_loop_buffer_size = opt.event_loop_buffer_size as usize;
start_server(
scheduler_name,
client,
addr,
scheduling_policy,
slots_policy,
+ event_loop_buffer_size,
)
.await?;
Ok(())
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index ddd212a1..e77e774b 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -582,6 +582,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
+ 10000,
);
scheduler.init().await?;
let exec_meta = ExecutorRegistration {
@@ -667,6 +668,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
+ 10000,
);
scheduler.init().await?;
@@ -746,6 +748,7 @@ mod test {
"localhost:50050".to_owned(),
state_storage.clone(),
BallistaCodec::default(),
+ 10000,
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 883c6d06..44fed7db 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -68,6 +68,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
scheduler_name: String,
config: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
+ event_loop_buffer_size: usize,
) -> Self {
SchedulerServer::new_with_policy(
scheduler_name,
@@ -76,6 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
SlotsPolicy::Bias,
codec,
default_session_builder,
+ event_loop_buffer_size,
)
}
@@ -84,6 +86,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
config: Arc<dyn StateBackendClient>,
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
+ event_loop_buffer_size: usize,
) -> Self {
SchedulerServer::new_with_policy(
scheduler_name,
@@ -92,6 +95,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
SlotsPolicy::Bias,
codec,
session_builder,
+ event_loop_buffer_size,
)
}
@@ -102,6 +106,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
slots_policy: SlotsPolicy,
codec: BallistaCodec<T, U>,
session_builder: SessionBuilder,
+ event_loop_buffer_size: usize,
) -> Self {
let state = Arc::new(SchedulerState::new(
config,
@@ -111,18 +116,27 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
slots_policy,
));
- SchedulerServer::new_with_state(scheduler_name, scheduling_policy,
state)
+ SchedulerServer::new_with_state(
+ scheduler_name,
+ scheduling_policy,
+ state,
+ event_loop_buffer_size,
+ )
}
pub(crate) fn new_with_state(
scheduler_name: String,
policy: TaskSchedulingPolicy,
state: Arc<SchedulerState<T, U>>,
+ event_loop_buffer_size: usize,
) -> Self {
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(state.clone(), policy));
- let query_stage_event_loop =
- EventLoop::new("query_stage".to_owned(), 10000,
query_stage_scheduler);
+ let query_stage_event_loop = EventLoop::new(
+ "query_stage".to_owned(),
+ event_loop_buffer_size,
+ query_stage_scheduler,
+ );
Self {
scheduler_name,
state,
@@ -770,6 +784,7 @@ mod test {
SlotsPolicy::Bias,
BallistaCodec::default(),
default_session_builder,
+ 10000,
);
scheduler.init().await?;
@@ -789,6 +804,7 @@ mod test {
"localhost:50050".to_owned(),
TaskSchedulingPolicy::PushStaged,
state,
+ 10000,
);
scheduler.init().await?;
diff --git a/ballista/scheduler/src/standalone.rs
b/ballista/scheduler/src/standalone.rs
index 4abc70a4..ce09580d 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -38,6 +38,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr>
{
"localhost:50050".to_owned(),
Arc::new(client),
BallistaCodec::default(),
+ 10000,
);
scheduler_server.init().await?;
let server = SchedulerGrpcServer::new(scheduler_server.clone());