This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new cb6147f9 Add timeout and keep-alive settings for Grpc Client and 
Server (#115)
cb6147f9 is described below

commit cb6147f9fa960ce4c70858d777011612c4a3783a
Author: mingmwang <[email protected]>
AuthorDate: Mon Aug 8 21:06:48 2022 +0800

    Add timeout and keep-alive settings for Grpc Client and Server (#115)
    
    * Add timeout and keep-alive settings for Grpc Client
    
    * Add timeout and keep-alive settings for Grpc Server
    
    * move server settings to utils
    
    * fix fmt
    
    * set tcp_nodelay to true explicitly
---
 ballista/rust/client/src/context.rs                |  7 +++--
 ballista/rust/core/src/client.rs                   |  6 ++--
 .../core/src/execution_plans/distributed_query.rs  |  6 ++--
 .../core/src/execution_plans/shuffle_reader.rs     |  2 ++
 ballista/rust/core/src/utils.rs                    | 32 ++++++++++++++++++++++
 ballista/rust/executor/src/executor_server.rs      |  5 ++--
 ballista/rust/executor/src/main.rs                 |  8 ++++--
 ballista/rust/executor/src/standalone.rs           | 11 +++++---
 ballista/rust/scheduler/src/main.rs                |  4 +--
 ballista/rust/scheduler/src/standalone.rs          | 17 ++++++------
 ballista/rust/scheduler/src/state/task_manager.rs  |  4 ++-
 11 files changed, 76 insertions(+), 26 deletions(-)

diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 03cd0799..32e1f661 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -28,7 +28,9 @@ use std::sync::Arc;
 use ballista_core::config::BallistaConfig;
 use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
 use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair};
-use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
+use ballista_core::utils::{
+    create_df_ctx_with_ballista_query_planner, create_grpc_client_connection,
+};
 use datafusion_proto::protobuf::LogicalPlanNode;
 
 use datafusion::catalog::TableReference;
@@ -93,9 +95,10 @@ impl BallistaContext {
             "Connecting to Ballista scheduler at {}",
             scheduler_url.clone()
         );
-        let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
+        let connection = create_grpc_client_connection(scheduler_url.clone())
             .await
             .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+        let mut scheduler = SchedulerGrpcClient::new(connection);
 
         let remote_session_id = scheduler
             .execute_query(ExecuteQueryParams {
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index a5c4a062..dfe2003f 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -39,6 +39,7 @@ use datafusion::arrow::{
     record_batch::RecordBatch,
 };
 
+use crate::utils::create_grpc_client_connection;
 use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
 use futures::{Stream, StreamExt};
 use log::debug;
@@ -57,8 +58,8 @@ impl BallistaClient {
     pub async fn try_new(host: &str, port: u16) -> Result<Self> {
         let addr = format!("http://{}:{}";, host, port);
         debug!("BallistaClient connecting to {}", addr);
-        let flight_client =
-            FlightServiceClient::connect(addr.clone())
+        let connection =
+            create_grpc_client_connection(addr.clone())
                 .await
                 .map_err(|e| {
                     BallistaError::General(format!(
@@ -66,6 +67,7 @@ impl BallistaClient {
                         addr, e
                     ))
                 })?;
+        let flight_client = FlightServiceClient::new(connection);
         debug!("BallistaClient connected OK");
 
         Ok(Self { flight_client })
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs 
b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 62e7ff02..11666a8e 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -23,6 +23,7 @@ use crate::serde::protobuf::{
     ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
     PartitionLocation,
 };
+use crate::utils::create_grpc_client_connection;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
 use datafusion::arrow::record_batch::RecordBatch;
@@ -235,11 +236,12 @@ async fn execute_query(
 ) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
     info!("Connecting to Ballista scheduler at {}", scheduler_url);
     // TODO reuse the scheduler to avoid connecting to the Ballista scheduler 
again and again
-
-    let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
+    let connection = create_grpc_client_connection(scheduler_url)
         .await
         .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
 
+    let mut scheduler = SchedulerGrpcClient::new(connection);
+
     let query_result = scheduler
         .execute_query(query)
         .await
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs 
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 3046a227..c69d120b 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -205,6 +205,8 @@ async fn fetch_partition(
 ) -> Result<SendableRecordBatchStream> {
     let metadata = &location.executor_meta;
     let partition_id = &location.partition_id;
+    // TODO for shuffle client connections, we should avoid creating new 
connections again and again.
+    // And we should also avoid to keep alive too many connections for long 
time.
     let mut ballista_client =
         BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
             .await
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 7493b1ee..d356703a 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -49,7 +49,10 @@ use std::io::{BufWriter, Write};
 use std::marker::PhantomData;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
+use std::time::Duration;
 use std::{fs::File, pin::Pin};
+use tonic::codegen::StdError;
+use tonic::transport::{Channel, Error, Server};
 
 /// Stream data to disk in Arrow IPC format
 
@@ -310,3 +313,32 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for 
BallistaQueryPlanner<T> {
         }
     }
 }
+
+pub async fn create_grpc_client_connection<D>(
+    dst: D,
+) -> std::result::Result<Channel, Error>
+where
+    D: std::convert::TryInto<tonic::transport::Endpoint>,
+    D::Error: Into<StdError>,
+{
+    let endpoint = tonic::transport::Endpoint::new(dst)?
+        .connect_timeout(Duration::from_secs(20))
+        .timeout(Duration::from_secs(20))
+        // Disable Nagle's Algorithm since we don't want packets to wait
+        .tcp_nodelay(true)
+        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+        .http2_keep_alive_interval(Duration::from_secs(300))
+        .keep_alive_timeout(Duration::from_secs(20))
+        .keep_alive_while_idle(true);
+    endpoint.connect().await
+}
+
+pub fn create_grpc_server() -> Server {
+    Server::builder()
+        .timeout(Duration::from_secs(20))
+        // Disable Nagle's Algorithm since we don't want packets to wait
+        .tcp_nodelay(true)
+        .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+        .http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
+        .http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
+}
diff --git a/ballista/rust/executor/src/executor_server.rs 
b/ballista/rust/executor/src/executor_server.rs
index 7fa48939..e484e05a 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -22,7 +22,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tokio::sync::mpsc;
 
 use log::{debug, error, info};
-use tonic::transport::{Channel, Server};
+use tonic::transport::Channel;
 use tonic::{Request, Response, Status};
 
 use ballista_core::error::BallistaError;
@@ -39,6 +39,7 @@ use ballista_core::serde::protobuf::{
 };
 use ballista_core::serde::scheduler::ExecutorState;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use ballista_core::utils::create_grpc_server;
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -84,7 +85,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>(
         info!("Setup executor grpc service for {:?}", addr);
 
         let server = ExecutorGrpcServer::new(executor_server.clone());
-        let grpc_server_future = 
Server::builder().add_service(server).serve(addr);
+        let grpc_server_future = 
create_grpc_server().add_service(server).serve(addr);
         tokio::spawn(async move { grpc_server_future.await });
     }
 
diff --git a/ballista/rust/executor/src/main.rs 
b/ballista/rust/executor/src/main.rs
index 6a87be77..cc7f41e3 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -28,7 +28,6 @@ use log::{error, info};
 use tempfile::TempDir;
 use tokio::fs::ReadDir;
 use tokio::{fs, time};
-use tonic::transport::Server;
 use uuid::Uuid;
 
 use ballista_core::config::TaskSchedulingPolicy;
@@ -39,6 +38,7 @@ use ballista_core::serde::protobuf::{
 };
 use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::BallistaCodec;
+use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
 use ballista_core::{print_version, BALLISTA_VERSION};
 use ballista_executor::executor::Executor;
 use ballista_executor::flight_service::BallistaFlightService;
@@ -134,10 +134,12 @@ async fn main() -> Result<()> {
         opt.concurrent_tasks,
     ));
 
-    let scheduler = SchedulerGrpcClient::connect(scheduler_url)
+    let connection = create_grpc_client_connection(scheduler_url)
         .await
         .context("Could not connect to scheduler")?;
 
+    let scheduler = SchedulerGrpcClient::new(connection);
+
     let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
         BallistaCodec::default();
 
@@ -185,7 +187,7 @@ async fn main() -> Result<()> {
             BALLISTA_VERSION, addr
         );
         let server_future =
-            tokio::spawn(Server::builder().add_service(server).serve(addr));
+            tokio::spawn(create_grpc_server().add_service(server).serve(addr));
         server_future
             .await
             .context("Tokio error")?
diff --git a/ballista/rust/executor/src/standalone.rs 
b/ballista/rust/executor/src/standalone.rs
index a2685afa..ca5513fa 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -20,6 +20,7 @@ use crate::{execution_loop, executor::Executor, 
flight_service::BallistaFlightSe
 use arrow_flight::flight_service_server::FlightServiceServer;
 use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use ballista_core::utils::create_grpc_server;
 use ballista_core::{
     error::Result,
     serde::protobuf::executor_registration::OptionalHost,
@@ -32,7 +33,7 @@ use log::info;
 use std::sync::Arc;
 use tempfile::TempDir;
 use tokio::net::TcpListener;
-use tonic::transport::{Channel, Server};
+use tonic::transport::Channel;
 use uuid::Uuid;
 
 pub async fn new_standalone_executor<
@@ -84,9 +85,11 @@ pub async fn new_standalone_executor<
     let service = BallistaFlightService::new(executor.clone());
     let server = FlightServiceServer::new(service);
     tokio::spawn(
-        Server::builder().add_service(server).serve_with_incoming(
-            tokio_stream::wrappers::TcpListenerStream::new(listener),
-        ),
+        create_grpc_server()
+            .add_service(server)
+            
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+                listener,
+            )),
     );
 
     tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
diff --git a/ballista/rust/scheduler/src/main.rs 
b/ballista/rust/scheduler/src/main.rs
index 61e4ae36..417e1855 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -25,7 +25,6 @@ use hyper::{server::conn::AddrStream, 
service::make_service_fn, Server};
 use std::convert::Infallible;
 use std::{net::SocketAddr, sync::Arc};
 use tonic::transport::server::Connected;
-use tonic::transport::Server as TonicServer;
 use tower::Service;
 
 use ballista_core::BALLISTA_VERSION;
@@ -60,6 +59,7 @@ mod config {
     ));
 }
 
+use ballista_core::utils::create_grpc_server;
 use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
 use config::prelude::*;
 use datafusion::execution::context::default_session_builder;
@@ -108,7 +108,7 @@ async fn start_server(
 
             let keda_scaler = 
ExternalScalerServer::new(scheduler_server.clone());
 
-            let mut tonic = TonicServer::builder()
+            let mut tonic = create_grpc_server()
                 .add_service(scheduler_grpc_server)
                 .add_service(flight_sql_server)
                 .add_service(keda_scaler)
diff --git a/ballista/rust/scheduler/src/standalone.rs 
b/ballista/rust/scheduler/src/standalone.rs
index 4f358a46..0de81b85 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -15,8 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::{
+    scheduler_server::SchedulerServer, 
state::backend::standalone::StandaloneClient,
+};
 use ballista_core::serde::protobuf::PhysicalPlanNode;
 use ballista_core::serde::BallistaCodec;
+use ballista_core::utils::create_grpc_server;
 use ballista_core::{
     error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
     BALLISTA_VERSION,
@@ -25,11 +29,6 @@ use datafusion_proto::protobuf::LogicalPlanNode;
 use log::info;
 use std::{net::SocketAddr, sync::Arc};
 use tokio::net::TcpListener;
-use tonic::transport::Server;
-
-use crate::{
-    scheduler_server::SchedulerServer, 
state::backend::standalone::StandaloneClient,
-};
 
 pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
     let client = StandaloneClient::try_new_temporary()?;
@@ -50,9 +49,11 @@ pub async fn new_standalone_scheduler() -> 
Result<SocketAddr> {
         BALLISTA_VERSION, addr
     );
     tokio::spawn(
-        Server::builder().add_service(server).serve_with_incoming(
-            tokio_stream::wrappers::TcpListenerStream::new(listener),
-        ),
+        create_grpc_server()
+            .add_service(server)
+            
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+                listener,
+            )),
     );
 
     Ok(addr)
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs 
b/ballista/rust/scheduler/src/state/task_manager.rs
index e3ceb610..c4c7fe53 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -440,7 +440,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> TaskManager<T, U>
         } else {
             let executor_id = executor.id.clone();
             let executor_url = format!("http://{}:{}";, executor.host, 
executor.grpc_port);
-            let mut client = ExecutorGrpcClient::connect(executor_url).await?;
+            let connection =
+                
ballista_core::utils::create_grpc_client_connection(executor_url).await?;
+            let mut client = ExecutorGrpcClient::new(connection);
             clients.insert(executor_id, client.clone());
             client
                 .launch_task(protobuf::LaunchTaskParams {

Reply via email to