This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b3b35f5 Make max encoding message size configurable (#928)
4b3b35f5 is described below

commit 4b3b35f509d9584f6d3be3c697655dee40e4b1ce
Author: Andy Grove <[email protected]>
AuthorDate: Mon Dec 11 08:42:11 2023 -0700

    Make max encoding message size configurable (#928)
---
 ballista/executor/executor_config_spec.toml   | 6 ++++++
 ballista/executor/src/bin/main.rs             | 1 +
 ballista/executor/src/executor_process.rs     | 2 ++
 ballista/executor/src/executor_server.rs      | 3 +++
 ballista/scheduler/scheduler_config_spec.toml | 6 ++++++
 ballista/scheduler/src/bin/main.rs            | 1 +
 ballista/scheduler/src/config.rs              | 8 ++++++++
 ballista/scheduler/src/scheduler_process.rs   | 3 +++
 8 files changed, 30 insertions(+)

diff --git a/ballista/executor/executor_config_spec.toml 
b/ballista/executor/executor_config_spec.toml
index 8bed5ac6..209069de 100644
--- a/ballista/executor/executor_config_spec.toml
+++ b/ballista/executor/executor_config_spec.toml
@@ -132,6 +132,12 @@ type = "u32"
 default = "16777216"
 doc = "The maximum size of a decoded message at the grpc server side. Default: 
16MB"
 
+[[param]]
+name = "grpc_server_max_encoding_message_size"
+type = "u32"
+default = "16777216"
+doc = "The maximum size of an encoded message at the grpc server side. 
Default: 16MB"
+
 [[param]]
 name = "executor_heartbeat_interval_seconds"
 type = "u64"
diff --git a/ballista/executor/src/bin/main.rs 
b/ballista/executor/src/bin/main.rs
index 52bf6348..a7ba8e36 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -79,6 +79,7 @@ async fn main() -> Result<()> {
         job_data_ttl_seconds: opt.job_data_ttl_seconds,
         job_data_clean_up_interval_seconds: 
opt.job_data_clean_up_interval_seconds,
         grpc_server_max_decoding_message_size: 
opt.grpc_server_max_decoding_message_size,
+        grpc_server_max_encoding_message_size: 
opt.grpc_server_max_encoding_message_size,
         executor_heartbeat_interval_seconds: 
opt.executor_heartbeat_interval_seconds,
         data_cache_policy: opt.data_cache_policy,
         cache_dir: opt.cache_dir,
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 4672dffb..d3b56978 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -96,6 +96,8 @@ pub struct ExecutorProcessConfig {
     pub cache_io_concurrency: u32,
     /// The maximum size of a decoded message at the grpc server side.
     pub grpc_server_max_decoding_message_size: u32,
+    /// The maximum size of an encoded message at the grpc server side.
+    pub grpc_server_max_encoding_message_size: u32,
     pub executor_heartbeat_interval_seconds: u64,
     /// Optional execution engine to use to execute physical plans, will 
default to
     /// DataFusion if none is provided.
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index c046558b..90f888cc 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -112,6 +112,9 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static 
+ AsExecutionPlan>(
             BALLISTA_VERSION, addr
         );
         let server = ExecutorGrpcServer::new(executor_server.clone())
+            .max_encoding_message_size(
+                config.grpc_server_max_encoding_message_size as usize,
+            )
             .max_decoding_message_size(
                 config.grpc_server_max_decoding_message_size as usize,
             );
diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index c9f5154a..857212fe 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -171,6 +171,12 @@ type = "u32"
 default = "16777216"
 doc = "The maximum size of a decoded message at the grpc server side. Default: 
16MB"
 
+[[param]]
+name = "grpc_server_max_encoding_message_size"
+type = "u32"
+default = "16777216"
+doc = "The maximum size of an encoded message at the grpc server side. 
Default: 16MB"
+
 [[param]]
 name = "executor_timeout_seconds"
 type = "u64"
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index 46ac7cc9..dbdfc747 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -155,6 +155,7 @@ async fn main() -> Result<()> {
         scheduler_event_expected_processing_duration: opt
             .scheduler_event_expected_processing_duration,
         grpc_server_max_decoding_message_size: 
opt.grpc_server_max_decoding_message_size,
+        grpc_server_max_encoding_message_size: 
opt.grpc_server_max_encoding_message_size,
         executor_timeout_seconds: opt.executor_timeout_seconds,
         expire_dead_executor_interval_seconds: 
opt.expire_dead_executor_interval_seconds,
     };
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 7a0c10c4..38ae5ae0 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -56,6 +56,8 @@ pub struct SchedulerConfig {
     pub scheduler_event_expected_processing_duration: u64,
     /// The maximum size of a decoded message at the grpc server side.
     pub grpc_server_max_decoding_message_size: u32,
+    /// The maximum size of an encoded message at the grpc server side.
+    pub grpc_server_max_encoding_message_size: u32,
     /// The executor timeout in seconds. It should be longer than executor's 
heartbeat intervals.
     pub executor_timeout_seconds: u64,
     /// The interval to check expired or dead executors
@@ -79,6 +81,7 @@ impl Default for SchedulerConfig {
             executor_termination_grace_period: 0,
             scheduler_event_expected_processing_duration: 0,
             grpc_server_max_decoding_message_size: 16777216,
+            grpc_server_max_encoding_message_size: 16777216,
             executor_timeout_seconds: 180,
             expire_dead_executor_interval_seconds: 15,
         }
@@ -167,6 +170,11 @@ impl SchedulerConfig {
         self.grpc_server_max_decoding_message_size = value;
         self
     }
+
+    pub fn with_grpc_server_max_encoding_message_size(mut self, value: u32) -> 
Self {
+        self.grpc_server_max_encoding_message_size = value;
+        self
+    }
 }
 
 #[derive(Clone, Debug)]
diff --git a/ballista/scheduler/src/scheduler_process.rs 
b/ballista/scheduler/src/scheduler_process.rs
index 64ea3072..6bcaaec5 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -75,6 +75,9 @@ pub async fn start_server(
             let config = &scheduler_server.state.config;
             let scheduler_grpc_server =
                 SchedulerGrpcServer::new(scheduler_server.clone())
+                    .max_encoding_message_size(
+                        config.grpc_server_max_encoding_message_size as usize,
+                    )
                     .max_decoding_message_size(
                         config.grpc_server_max_decoding_message_size as usize,
                     );

Reply via email to