This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c2420c9 Make the scheduler event loop buffer size configurable (#398)
5c2420c9 is described below

commit 5c2420c9b3f3ba1b4c363b2fd93309c93ac851fd
Author: yahoNanJing <[email protected]>
AuthorDate: Fri Oct 21 11:56:06 2022 +0800

    Make the scheduler event loop buffer size configurable (#398)
    
    * Make the scheduler event loop buffer size configurable
    
    * Fix PR review
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/scheduler/scheduler_config_spec.toml   |  6 ++++++
 ballista/scheduler/src/main.rs                  |  5 +++++
 ballista/scheduler/src/scheduler_server/grpc.rs |  3 +++
 ballista/scheduler/src/scheduler_server/mod.rs  | 22 +++++++++++++++++++---
 ballista/scheduler/src/standalone.rs            |  1 +
 5 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index 7549260e..daf7bf60 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -72,6 +72,12 @@ type = "ballista_core::config::TaskSchedulingPolicy"
 doc = "The scheduing policy for the scheduler, possible values: pull-staged, 
push-staged. Default: pull-staged"
 default = "ballista_core::config::TaskSchedulingPolicy::PullStaged"
 
+[[param]]
+name = "event_loop_buffer_size"
+type = "u32"
+default = "10000"
+doc = "Event loop buffer size. Default: 10000"
+
 [[param]]
 name = "executor_slots_policy"
 type = "ballista_scheduler::config::SlotsPolicy"
diff --git a/ballista/scheduler/src/main.rs b/ballista/scheduler/src/main.rs
index fafdfa7a..0a0c4faf 100644
--- a/ballista/scheduler/src/main.rs
+++ b/ballista/scheduler/src/main.rs
@@ -74,6 +74,7 @@ async fn start_server(
     addr: SocketAddr,
     scheduling_policy: TaskSchedulingPolicy,
     slots_policy: SlotsPolicy,
+    event_loop_buffer_size: usize,
 ) -> Result<()> {
     info!(
         "Ballista v{} Scheduler listening on {:?}",
@@ -93,11 +94,13 @@ async fn start_server(
                 slots_policy,
                 BallistaCodec::default(),
                 default_session_builder,
+                event_loop_buffer_size,
             ),
             _ => SchedulerServer::new(
                 scheduler_name,
                 config_backend.clone(),
                 BallistaCodec::default(),
+                event_loop_buffer_size,
             ),
         };
 
@@ -244,12 +247,14 @@ async fn main() -> Result<()> {
 
     let scheduling_policy: TaskSchedulingPolicy = opt.scheduler_policy;
     let slots_policy: SlotsPolicy = opt.executor_slots_policy;
+    let event_loop_buffer_size = opt.event_loop_buffer_size as usize;
     start_server(
         scheduler_name,
         client,
         addr,
         scheduling_policy,
         slots_policy,
+        event_loop_buffer_size,
     )
     .await?;
     Ok(())
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index ddd212a1..e77e774b 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -582,6 +582,7 @@ mod test {
                 "localhost:50050".to_owned(),
                 state_storage.clone(),
                 BallistaCodec::default(),
+                10000,
             );
         scheduler.init().await?;
         let exec_meta = ExecutorRegistration {
@@ -667,6 +668,7 @@ mod test {
                 "localhost:50050".to_owned(),
                 state_storage.clone(),
                 BallistaCodec::default(),
+                10000,
             );
         scheduler.init().await?;
 
@@ -746,6 +748,7 @@ mod test {
                 "localhost:50050".to_owned(),
                 state_storage.clone(),
                 BallistaCodec::default(),
+                10000,
             );
         scheduler.init().await?;
 
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 883c6d06..44fed7db 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -68,6 +68,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         scheduler_name: String,
         config: Arc<dyn StateBackendClient>,
         codec: BallistaCodec<T, U>,
+        event_loop_buffer_size: usize,
     ) -> Self {
         SchedulerServer::new_with_policy(
             scheduler_name,
@@ -76,6 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             SlotsPolicy::Bias,
             codec,
             default_session_builder,
+            event_loop_buffer_size,
         )
     }
 
@@ -84,6 +86,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         config: Arc<dyn StateBackendClient>,
         codec: BallistaCodec<T, U>,
         session_builder: SessionBuilder,
+        event_loop_buffer_size: usize,
     ) -> Self {
         SchedulerServer::new_with_policy(
             scheduler_name,
@@ -92,6 +95,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             SlotsPolicy::Bias,
             codec,
             session_builder,
+            event_loop_buffer_size,
         )
     }
 
@@ -102,6 +106,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
         slots_policy: SlotsPolicy,
         codec: BallistaCodec<T, U>,
         session_builder: SessionBuilder,
+        event_loop_buffer_size: usize,
     ) -> Self {
         let state = Arc::new(SchedulerState::new(
             config,
@@ -111,18 +116,27 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             slots_policy,
         ));
 
-        SchedulerServer::new_with_state(scheduler_name, scheduling_policy, 
state)
+        SchedulerServer::new_with_state(
+            scheduler_name,
+            scheduling_policy,
+            state,
+            event_loop_buffer_size,
+        )
     }
 
     pub(crate) fn new_with_state(
         scheduler_name: String,
         policy: TaskSchedulingPolicy,
         state: Arc<SchedulerState<T, U>>,
+        event_loop_buffer_size: usize,
     ) -> Self {
         let query_stage_scheduler =
             Arc::new(QueryStageScheduler::new(state.clone(), policy));
-        let query_stage_event_loop =
-            EventLoop::new("query_stage".to_owned(), 10000, 
query_stage_scheduler);
+        let query_stage_event_loop = EventLoop::new(
+            "query_stage".to_owned(),
+            event_loop_buffer_size,
+            query_stage_scheduler,
+        );
         Self {
             scheduler_name,
             state,
@@ -770,6 +784,7 @@ mod test {
                 SlotsPolicy::Bias,
                 BallistaCodec::default(),
                 default_session_builder,
+                10000,
             );
         scheduler.init().await?;
 
@@ -789,6 +804,7 @@ mod test {
                 "localhost:50050".to_owned(),
                 TaskSchedulingPolicy::PushStaged,
                 state,
+                10000,
             );
         scheduler.init().await?;
 
diff --git a/ballista/scheduler/src/standalone.rs 
b/ballista/scheduler/src/standalone.rs
index 4abc70a4..ce09580d 100644
--- a/ballista/scheduler/src/standalone.rs
+++ b/ballista/scheduler/src/standalone.rs
@@ -38,6 +38,7 @@ pub async fn new_standalone_scheduler() -> Result<SocketAddr> 
{
             "localhost:50050".to_owned(),
             Arc::new(client),
             BallistaCodec::default(),
+            10000,
         );
     scheduler_server.init().await?;
     let server = SchedulerGrpcServer::new(scheduler_server.clone());

Reply via email to