This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 4b3b35f5 Make max encoding message size configurable (#928)
4b3b35f5 is described below
commit 4b3b35f509d9584f6d3be3c697655dee40e4b1ce
Author: Andy Grove <[email protected]>
AuthorDate: Mon Dec 11 08:42:11 2023 -0700
Make max encoding message size configurable (#928)
---
ballista/executor/executor_config_spec.toml | 6 ++++++
ballista/executor/src/bin/main.rs | 1 +
ballista/executor/src/executor_process.rs | 2 ++
ballista/executor/src/executor_server.rs | 3 +++
ballista/scheduler/scheduler_config_spec.toml | 6 ++++++
ballista/scheduler/src/bin/main.rs | 1 +
ballista/scheduler/src/config.rs | 8 ++++++++
ballista/scheduler/src/scheduler_process.rs | 3 +++
8 files changed, 30 insertions(+)
diff --git a/ballista/executor/executor_config_spec.toml
b/ballista/executor/executor_config_spec.toml
index 8bed5ac6..209069de 100644
--- a/ballista/executor/executor_config_spec.toml
+++ b/ballista/executor/executor_config_spec.toml
@@ -132,6 +132,12 @@ type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default:
16MB"
+[[param]]
+name = "grpc_server_max_encoding_message_size"
+type = "u32"
+default = "16777216"
+doc = "The maximum size of an encoded message at the grpc server side.
Default: 16MB"
+
[[param]]
name = "executor_heartbeat_interval_seconds"
type = "u64"
diff --git a/ballista/executor/src/bin/main.rs
b/ballista/executor/src/bin/main.rs
index 52bf6348..a7ba8e36 100644
--- a/ballista/executor/src/bin/main.rs
+++ b/ballista/executor/src/bin/main.rs
@@ -79,6 +79,7 @@ async fn main() -> Result<()> {
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds:
opt.job_data_clean_up_interval_seconds,
grpc_server_max_decoding_message_size:
opt.grpc_server_max_decoding_message_size,
+ grpc_server_max_encoding_message_size:
opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds:
opt.executor_heartbeat_interval_seconds,
data_cache_policy: opt.data_cache_policy,
cache_dir: opt.cache_dir,
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index 4672dffb..d3b56978 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -96,6 +96,8 @@ pub struct ExecutorProcessConfig {
pub cache_io_concurrency: u32,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
+ /// The maximum size of an encoded message at the grpc server side.
+ pub grpc_server_max_encoding_message_size: u32,
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will
default to
/// DataFusion if none is provided.
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index c046558b..90f888cc 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -112,6 +112,9 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static
+ AsExecutionPlan>(
BALLISTA_VERSION, addr
);
let server = ExecutorGrpcServer::new(executor_server.clone())
+ .max_encoding_message_size(
+ config.grpc_server_max_encoding_message_size as usize,
+ )
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
);
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index c9f5154a..857212fe 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -171,6 +171,12 @@ type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default:
16MB"
+[[param]]
+name = "grpc_server_max_encoding_message_size"
+type = "u32"
+default = "16777216"
+doc = "The maximum size of an encoded message at the grpc server side.
Default: 16MB"
+
[[param]]
name = "executor_timeout_seconds"
type = "u64"
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index 46ac7cc9..dbdfc747 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -155,6 +155,7 @@ async fn main() -> Result<()> {
scheduler_event_expected_processing_duration: opt
.scheduler_event_expected_processing_duration,
grpc_server_max_decoding_message_size:
opt.grpc_server_max_decoding_message_size,
+ grpc_server_max_encoding_message_size:
opt.grpc_server_max_encoding_message_size,
executor_timeout_seconds: opt.executor_timeout_seconds,
expire_dead_executor_interval_seconds:
opt.expire_dead_executor_interval_seconds,
};
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index 7a0c10c4..38ae5ae0 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -56,6 +56,8 @@ pub struct SchedulerConfig {
pub scheduler_event_expected_processing_duration: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
+ /// The maximum size of an encoded message at the grpc server side.
+ pub grpc_server_max_encoding_message_size: u32,
/// The executor timeout in seconds. It should be longer than executor's
heartbeat intervals.
pub executor_timeout_seconds: u64,
/// The interval to check expired or dead executors
@@ -79,6 +81,7 @@ impl Default for SchedulerConfig {
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
grpc_server_max_decoding_message_size: 16777216,
+ grpc_server_max_encoding_message_size: 16777216,
executor_timeout_seconds: 180,
expire_dead_executor_interval_seconds: 15,
}
@@ -167,6 +170,11 @@ impl SchedulerConfig {
self.grpc_server_max_decoding_message_size = value;
self
}
+
+ pub fn with_grpc_server_max_encoding_message_size(mut self, value: u32) ->
Self {
+ self.grpc_server_max_encoding_message_size = value;
+ self
+ }
}
#[derive(Clone, Debug)]
diff --git a/ballista/scheduler/src/scheduler_process.rs
b/ballista/scheduler/src/scheduler_process.rs
index 64ea3072..6bcaaec5 100644
--- a/ballista/scheduler/src/scheduler_process.rs
+++ b/ballista/scheduler/src/scheduler_process.rs
@@ -75,6 +75,9 @@ pub async fn start_server(
let config = &scheduler_server.state.config;
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone())
+ .max_encoding_message_size(
+ config.grpc_server_max_encoding_message_size as usize,
+ )
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
);