milenkovicm commented on code in PR #1337:
URL: 
https://github.com/apache/datafusion-ballista/pull/1337#discussion_r2475452402


##########
ballista/core/src/utils.rs:
##########
@@ -36,6 +37,64 @@ use std::{fs::File, pin::Pin};
 use tonic::codegen::StdError;
 use tonic::transport::{Channel, Error, Server};
 
+#[derive(Debug, Clone)]
+pub struct GrpcClientConfig {
+    pub connect_timeout_seconds: u64,
+    pub timeout_seconds: u64,
+    pub tcp_keepalive_seconds: u64,
+    pub http2_keepalive_interval_seconds: u64,
+    pub keepalive_timeout_seconds: u64,
+}
+
+impl GrpcClientConfig {
+    pub fn from_ballista_config(config: &BallistaConfig) -> Self {

Review Comment:
   would it make sense to implement this as `From` or `Into` not sure which one 
would be better



##########
ballista/scheduler/src/state/executor_manager.rs:
##########
@@ -427,7 +429,9 @@ impl ExecutorManager {
                 "http://{}:{}";,
                 executor_metadata.host, executor_metadata.grpc_port
             );
-            let connection = 
create_grpc_client_connection(executor_url).await?;
+            let grpc_config = GrpcClientConfig::default();

Review Comment:
   can this be method parameter, so we can use `BallistaConfig` value instead 
of default values here?



##########
ballista/core/src/utils.rs:
##########
@@ -36,6 +37,64 @@ use std::{fs::File, pin::Pin};
 use tonic::codegen::StdError;
 use tonic::transport::{Channel, Error, Server};
 
+#[derive(Debug, Clone)]
+pub struct GrpcClientConfig {
+    pub connect_timeout_seconds: u64,
+    pub timeout_seconds: u64,
+    pub tcp_keepalive_seconds: u64,
+    pub http2_keepalive_interval_seconds: u64,
+    pub keepalive_timeout_seconds: u64,
+}
+
+impl GrpcClientConfig {
+    pub fn from_ballista_config(config: &BallistaConfig) -> Self {
+        Self {
+            connect_timeout_seconds: 
config.default_grpc_client_connect_timeout_seconds()
+                as u64,
+            timeout_seconds: config.default_grpc_client_timeout_seconds() as 
u64,
+            tcp_keepalive_seconds: 
config.default_grpc_client_tcp_keepalive_seconds()
+                as u64,
+            http2_keepalive_interval_seconds: config
+                .default_grpc_client_http2_keepalive_interval_seconds()
+                as u64,
+            keepalive_timeout_seconds: config
+                .default_standalone_grpc_client_keepalive_timeout_seconds()
+                as u64,
+        }
+    }
+}
+
+impl Default for GrpcClientConfig {
+    fn default() -> Self {
+        Self {
+            connect_timeout_seconds: 20,
+            timeout_seconds: 20,
+            tcp_keepalive_seconds: 3600,
+            http2_keepalive_interval_seconds: 300,
+            keepalive_timeout_seconds: 20,
+        }
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct GrpcServerConfig {

Review Comment:
   can you please put some docs please, we will try to catch up with 
documentation at some point so it would help.



##########
ballista/core/src/config.rs:
##########
@@ -39,6 +39,18 @@ pub const BALLISTA_SHUFFLE_READER_FORCE_REMOTE_READ: &str =
 pub const BALLISTA_SHUFFLE_READER_REMOTE_PREFER_FLIGHT: &str =
     "ballista.shuffle.remote_read_prefer_flight";
 
+// gRPC client timeout configurations
+pub const BALLISTA_GRPC_CLIENT_CONNECT_TIMEOUT_SECONDS: &str =
+    "ballista.grpc.client.connect_timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TIMEOUT_SECONDS: &str =
+    "ballista.grpc.client.timeout_seconds";
+pub const BALLISTA_GRPC_CLIENT_TCP_KEEPALIVE_SECONDS: &str =
+    "ballista.grpc.client.tcp_keepalive_seconds";
+pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
+    "ballista.grpc.client.http2_keepalive_interval_seconds";
+pub const BALLISTA_STANDALONE_GRPC_CLIENT_KEEPALIVE_TIMEOUT_SECONDS: &str =

Review Comment:
   Do we need standalone config? Standalone is used just for local testing, 
maybe if we can live without it. It would make configuration options simpler. 
WDYT?



##########
ballista/core/src/utils.rs:
##########
@@ -36,6 +37,64 @@ use std::{fs::File, pin::Pin};
 use tonic::codegen::StdError;
 use tonic::transport::{Channel, Error, Server};
 
+#[derive(Debug, Clone)]
+pub struct GrpcClientConfig {

Review Comment:
   can you please put some docs please, we will try to catch up with 
documentation at some point so it would help.



##########
ballista/scheduler/src/standalone.rs:
##########
@@ -92,8 +93,9 @@ pub async fn new_standalone_scheduler_with_builder(
     let listener = TcpListener::bind("localhost:0").await?;
     let addr = listener.local_addr()?;
     info!("Ballista v{BALLISTA_VERSION} Rust Scheduler listening on {addr:?}");
+    let grpc_server_config = GrpcServerConfig::default();
     tokio::spawn(
-        create_grpc_server()
+        create_grpc_server(&grpc_server_config)

Review Comment:
   I would suggest
   
   ```rust
   create_grpc_server(&GrpcServerConfig::default();)
   ```
   
   if possible 



##########
ballista/executor/src/standalone.rs:
##########
@@ -123,8 +123,9 @@ pub async fn new_standalone_executor_from_builder(
         .max_decoding_message_size(max_message_size)
         .max_encoding_message_size(max_message_size);
 
+    let grpc_server_config = GrpcServerConfig::default();

Review Comment:
   can you avoid variable here and create config at method call 
   
   ```rust
   create_grpc_server(&GrpcServerConfig::default();)
   ```



##########
ballista/client/tests/context_checks.rs:
##########
@@ -240,12 +240,12 @@ mod supported {
             .await?;
 
         let expected = [
-            "+---------------------------------------+----------+",
-            "| name                                  | value    |",
-            "+---------------------------------------+----------+",
-            "| ballista.grpc_client_max_message_size | 16777216 |",
-            "| ballista.job.name                     |          |",
-            "+---------------------------------------+----------+",
+            
"+-------------------------------------------------------+-------+",
+            "| name                                                  | value 
|",
+            
"+-------------------------------------------------------+-------+",

Review Comment:
   can we please update query to return ` ballista.job.name` only. so we don't 
change the test every time we add new option. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to