This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new cf76969 Make call SchedulerServer::new once in ballista-scheduler
process (#1537)
cf76969 is described below
commit cf7696951679b2ef2b6efd45c8bc76e63c4bc8f6
Author: Yang <[email protected]>
AuthorDate: Fri Jan 14 00:57:20 2022 +0800
Make call SchedulerServer::new once in ballista-scheduler process (#1537)
* Make call SchedulerServer::new once in ballista-scheduler process
* cargo fmt
* fix wrong annotation
---
ballista/rust/scheduler/src/lib.rs | 4 ++++
ballista/rust/scheduler/src/main.rs | 15 ++++++++++-----
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/ballista/rust/scheduler/src/lib.rs
b/ballista/rust/scheduler/src/lib.rs
index 107ea28..61da0d9 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -120,6 +120,10 @@ impl SchedulerServer {
.as_millis(),
}
}
+
+ pub fn set_caller_ip(&mut self, ip: IpAddr) {
+ self.caller_ip = ip;
+ }
}
const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index 7b79eb1..23a0386 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -22,6 +22,7 @@ use
ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerSe
use futures::future::{self, Either, TryFutureExt};
use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
use std::convert::Infallible;
+use std::net::{IpAddr, Ipv4Addr};
use std::{net::SocketAddr, sync::Arc};
use tonic::transport::Server as TonicServer;
use tower::Service;
@@ -62,14 +63,18 @@ async fn start_server(
"Ballista v{} Scheduler listening on {:?}",
BALLISTA_VERSION, addr
);
+ //should only call SchedulerServer::new() once in the process
+ let scheduler_server_without_caller_ip = SchedulerServer::new(
+ config_backend.clone(),
+ namespace.clone(),
+ IpAddr::V4(Ipv4Addr::UNSPECIFIED),
+ );
Ok(Server::bind(&addr)
.serve(make_service_fn(move |request: &AddrStream| {
- let scheduler_server = SchedulerServer::new(
- config_backend.clone(),
- namespace.clone(),
- request.remote_addr().ip(),
- );
+ let mut scheduler_server =
scheduler_server_without_caller_ip.clone();
+ scheduler_server.set_caller_ip(request.remote_addr().ip());
+
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone());