This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new cb6147f9 Add timeout and keep-alive settings for Grpc Client and
Server (#115)
cb6147f9 is described below
commit cb6147f9fa960ce4c70858d777011612c4a3783a
Author: mingmwang <[email protected]>
AuthorDate: Mon Aug 8 21:06:48 2022 +0800
Add timeout and keep-alive settings for Grpc Client and Server (#115)
* Add timeout and keep-alive settings for Grpc Client
* Add timeout and keep-alive settings for Grpc Server
* move server settings to utils
* fix fmt
* set tcp_nodelay to true explicitly
---
ballista/rust/client/src/context.rs | 7 +++--
ballista/rust/core/src/client.rs | 6 ++--
.../core/src/execution_plans/distributed_query.rs | 6 ++--
.../core/src/execution_plans/shuffle_reader.rs | 2 ++
ballista/rust/core/src/utils.rs | 32 ++++++++++++++++++++++
ballista/rust/executor/src/executor_server.rs | 5 ++--
ballista/rust/executor/src/main.rs | 8 ++++--
ballista/rust/executor/src/standalone.rs | 11 +++++---
ballista/rust/scheduler/src/main.rs | 4 +--
ballista/rust/scheduler/src/standalone.rs | 17 ++++++------
ballista/rust/scheduler/src/state/task_manager.rs | 4 ++-
11 files changed, 76 insertions(+), 26 deletions(-)
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index 03cd0799..32e1f661 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -28,7 +28,9 @@ use std::sync::Arc;
use ballista_core::config::BallistaConfig;
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair};
-use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
+use ballista_core::utils::{
+ create_df_ctx_with_ballista_query_planner, create_grpc_client_connection,
+};
use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion::catalog::TableReference;
@@ -93,9 +95,10 @@ impl BallistaContext {
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
- let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
+ let connection = create_grpc_client_connection(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ let mut scheduler = SchedulerGrpcClient::new(connection);
let remote_session_id = scheduler
.execute_query(ExecuteQueryParams {
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index a5c4a062..dfe2003f 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -39,6 +39,7 @@ use datafusion::arrow::{
record_batch::RecordBatch,
};
+use crate::utils::create_grpc_client_connection;
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use futures::{Stream, StreamExt};
use log::debug;
@@ -57,8 +58,8 @@ impl BallistaClient {
pub async fn try_new(host: &str, port: u16) -> Result<Self> {
let addr = format!("http://{}:{}", host, port);
debug!("BallistaClient connecting to {}", addr);
- let flight_client =
- FlightServiceClient::connect(addr.clone())
+ let connection =
+ create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::General(format!(
@@ -66,6 +67,7 @@ impl BallistaClient {
addr, e
))
})?;
+ let flight_client = FlightServiceClient::new(connection);
debug!("BallistaClient connected OK");
Ok(Self { flight_client })
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs
b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 62e7ff02..11666a8e 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -23,6 +23,7 @@ use crate::serde::protobuf::{
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
PartitionLocation,
};
+use crate::utils::create_grpc_client_connection;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
use datafusion::arrow::record_batch::RecordBatch;
@@ -235,11 +236,12 @@ async fn execute_query(
) -> Result<impl Stream<Item = ArrowResult<RecordBatch>> + Send> {
info!("Connecting to Ballista scheduler at {}", scheduler_url);
// TODO reuse the scheduler to avoid connecting to the Ballista scheduler
again and again
-
- let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
+ let connection = create_grpc_client_connection(scheduler_url)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ let mut scheduler = SchedulerGrpcClient::new(connection);
+
let query_result = scheduler
.execute_query(query)
.await
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 3046a227..c69d120b 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -205,6 +205,8 @@ async fn fetch_partition(
) -> Result<SendableRecordBatchStream> {
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;
+ // TODO for shuffle client connections, we should avoid creating new
connections again and again.
+ // And we should also avoid to keep alive too many connections for long
time.
let mut ballista_client =
BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
.await
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 7493b1ee..d356703a 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -49,7 +49,10 @@ use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
+use std::time::Duration;
use std::{fs::File, pin::Pin};
+use tonic::codegen::StdError;
+use tonic::transport::{Channel, Error, Server};
/// Stream data to disk in Arrow IPC format
@@ -310,3 +313,32 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for
BallistaQueryPlanner<T> {
}
}
}
+
+pub async fn create_grpc_client_connection<D>(
+ dst: D,
+) -> std::result::Result<Channel, Error>
+where
+ D: std::convert::TryInto<tonic::transport::Endpoint>,
+ D::Error: Into<StdError>,
+{
+ let endpoint = tonic::transport::Endpoint::new(dst)?
+ .connect_timeout(Duration::from_secs(20))
+ .timeout(Duration::from_secs(20))
+ // Disable Nagle's Algorithm since we don't want packets to wait
+ .tcp_nodelay(true)
+ .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+ .http2_keep_alive_interval(Duration::from_secs(300))
+ .keep_alive_timeout(Duration::from_secs(20))
+ .keep_alive_while_idle(true);
+ endpoint.connect().await
+}
+
+pub fn create_grpc_server() -> Server {
+ Server::builder()
+ .timeout(Duration::from_secs(20))
+ // Disable Nagle's Algorithm since we don't want packets to wait
+ .tcp_nodelay(true)
+ .tcp_keepalive(Option::Some(Duration::from_secs(3600)))
+ .http2_keepalive_interval(Option::Some(Duration::from_secs(300)))
+ .http2_keepalive_timeout(Option::Some(Duration::from_secs(20)))
+}
diff --git a/ballista/rust/executor/src/executor_server.rs
b/ballista/rust/executor/src/executor_server.rs
index 7fa48939..e484e05a 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -22,7 +22,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use log::{debug, error, info};
-use tonic::transport::{Channel, Server};
+use tonic::transport::Channel;
use tonic::{Request, Response, Status};
use ballista_core::error::BallistaError;
@@ -39,6 +39,7 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::scheduler::ExecutorState;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use ballista_core::utils::create_grpc_server;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -84,7 +85,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>(
info!("Setup executor grpc service for {:?}", addr);
let server = ExecutorGrpcServer::new(executor_server.clone());
- let grpc_server_future =
Server::builder().add_service(server).serve(addr);
+ let grpc_server_future =
create_grpc_server().add_service(server).serve(addr);
tokio::spawn(async move { grpc_server_future.await });
}
diff --git a/ballista/rust/executor/src/main.rs
b/ballista/rust/executor/src/main.rs
index 6a87be77..cc7f41e3 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -28,7 +28,6 @@ use log::{error, info};
use tempfile::TempDir;
use tokio::fs::ReadDir;
use tokio::{fs, time};
-use tonic::transport::Server;
use uuid::Uuid;
use ballista_core::config::TaskSchedulingPolicy;
@@ -39,6 +38,7 @@ use ballista_core::serde::protobuf::{
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
+use ballista_core::utils::{create_grpc_client_connection, create_grpc_server};
use ballista_core::{print_version, BALLISTA_VERSION};
use ballista_executor::executor::Executor;
use ballista_executor::flight_service::BallistaFlightService;
@@ -134,10 +134,12 @@ async fn main() -> Result<()> {
opt.concurrent_tasks,
));
- let scheduler = SchedulerGrpcClient::connect(scheduler_url)
+ let connection = create_grpc_client_connection(scheduler_url)
.await
.context("Could not connect to scheduler")?;
+ let scheduler = SchedulerGrpcClient::new(connection);
+
let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();
@@ -185,7 +187,7 @@ async fn main() -> Result<()> {
BALLISTA_VERSION, addr
);
let server_future =
- tokio::spawn(Server::builder().add_service(server).serve(addr));
+ tokio::spawn(create_grpc_server().add_service(server).serve(addr));
server_future
.await
.context("Tokio error")?
diff --git a/ballista/rust/executor/src/standalone.rs
b/ballista/rust/executor/src/standalone.rs
index a2685afa..ca5513fa 100644
--- a/ballista/rust/executor/src/standalone.rs
+++ b/ballista/rust/executor/src/standalone.rs
@@ -20,6 +20,7 @@ use crate::{execution_loop, executor::Executor,
flight_service::BallistaFlightSe
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
+use ballista_core::utils::create_grpc_server;
use ballista_core::{
error::Result,
serde::protobuf::executor_registration::OptionalHost,
@@ -32,7 +33,7 @@ use log::info;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
-use tonic::transport::{Channel, Server};
+use tonic::transport::Channel;
use uuid::Uuid;
pub async fn new_standalone_executor<
@@ -84,9 +85,11 @@ pub async fn new_standalone_executor<
let service = BallistaFlightService::new(executor.clone());
let server = FlightServiceServer::new(service);
tokio::spawn(
- Server::builder().add_service(server).serve_with_incoming(
- tokio_stream::wrappers::TcpListenerStream::new(listener),
- ),
+ create_grpc_server()
+ .add_service(server)
+
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+ listener,
+ )),
);
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index 61e4ae36..417e1855 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -25,7 +25,6 @@ use hyper::{server::conn::AddrStream,
service::make_service_fn, Server};
use std::convert::Infallible;
use std::{net::SocketAddr, sync::Arc};
use tonic::transport::server::Connected;
-use tonic::transport::Server as TonicServer;
use tower::Service;
use ballista_core::BALLISTA_VERSION;
@@ -60,6 +59,7 @@ mod config {
));
}
+use ballista_core::utils::create_grpc_server;
use ballista_scheduler::flight_sql::FlightSqlServiceImpl;
use config::prelude::*;
use datafusion::execution::context::default_session_builder;
@@ -108,7 +108,7 @@ async fn start_server(
let keda_scaler =
ExternalScalerServer::new(scheduler_server.clone());
- let mut tonic = TonicServer::builder()
+ let mut tonic = create_grpc_server()
.add_service(scheduler_grpc_server)
.add_service(flight_sql_server)
.add_service(keda_scaler)
diff --git a/ballista/rust/scheduler/src/standalone.rs
b/ballista/rust/scheduler/src/standalone.rs
index 4f358a46..0de81b85 100644
--- a/ballista/rust/scheduler/src/standalone.rs
+++ b/ballista/rust/scheduler/src/standalone.rs
@@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use crate::{
+ scheduler_server::SchedulerServer,
state::backend::standalone::StandaloneClient,
+};
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;
+use ballista_core::utils::create_grpc_server;
use ballista_core::{
error::Result, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
BALLISTA_VERSION,
@@ -25,11 +29,6 @@ use datafusion_proto::protobuf::LogicalPlanNode;
use log::info;
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
-use tonic::transport::Server;
-
-use crate::{
- scheduler_server::SchedulerServer,
state::backend::standalone::StandaloneClient,
-};
pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
let client = StandaloneClient::try_new_temporary()?;
@@ -50,9 +49,11 @@ pub async fn new_standalone_scheduler() ->
Result<SocketAddr> {
BALLISTA_VERSION, addr
);
tokio::spawn(
- Server::builder().add_service(server).serve_with_incoming(
- tokio_stream::wrappers::TcpListenerStream::new(listener),
- ),
+ create_grpc_server()
+ .add_service(server)
+
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
+ listener,
+ )),
);
Ok(addr)
diff --git a/ballista/rust/scheduler/src/state/task_manager.rs
b/ballista/rust/scheduler/src/state/task_manager.rs
index e3ceb610..c4c7fe53 100644
--- a/ballista/rust/scheduler/src/state/task_manager.rs
+++ b/ballista/rust/scheduler/src/state/task_manager.rs
@@ -440,7 +440,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
} else {
let executor_id = executor.id.clone();
let executor_url = format!("http://{}:{}", executor.host,
executor.grpc_port);
- let mut client = ExecutorGrpcClient::connect(executor_url).await?;
+ let connection =
+
ballista_core::utils::create_grpc_client_connection(executor_url).await?;
+ let mut client = ExecutorGrpcClient::new(connection);
clients.insert(executor_id, client.clone());
client
.launch_task(protobuf::LaunchTaskParams {