This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new cf76969  Make call SchedulerServer::new once in ballista-scheduler 
process (#1537)
cf76969 is described below

commit cf7696951679b2ef2b6efd45c8bc76e63c4bc8f6
Author: Yang <[email protected]>
AuthorDate: Fri Jan 14 00:57:20 2022 +0800

    Make call SchedulerServer::new once in ballista-scheduler process (#1537)
    
    * Make call SchedulerServer::new once in ballista-scheduler process
    
    * cargo fmt
    
    * fix wrong annotation
---
 ballista/rust/scheduler/src/lib.rs  |  4 ++++
 ballista/rust/scheduler/src/main.rs | 15 ++++++++++-----
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/ballista/rust/scheduler/src/lib.rs 
b/ballista/rust/scheduler/src/lib.rs
index 107ea28..61da0d9 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -120,6 +120,10 @@ impl SchedulerServer {
                 .as_millis(),
         }
     }
+
+    pub fn set_caller_ip(&mut self, ip: IpAddr) {
+        self.caller_ip = ip;
+    }
 }
 
 const INFLIGHT_TASKS_METRIC_NAME: &str = "inflight_tasks";
diff --git a/ballista/rust/scheduler/src/main.rs 
b/ballista/rust/scheduler/src/main.rs
index 7b79eb1..23a0386 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -22,6 +22,7 @@ use 
ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerSe
 use futures::future::{self, Either, TryFutureExt};
 use hyper::{server::conn::AddrStream, service::make_service_fn, Server};
 use std::convert::Infallible;
+use std::net::{IpAddr, Ipv4Addr};
 use std::{net::SocketAddr, sync::Arc};
 use tonic::transport::Server as TonicServer;
 use tower::Service;
@@ -62,14 +63,18 @@ async fn start_server(
         "Ballista v{} Scheduler listening on {:?}",
         BALLISTA_VERSION, addr
     );
+    //should only call SchedulerServer::new() once in the process
+    let scheduler_server_without_caller_ip = SchedulerServer::new(
+        config_backend.clone(),
+        namespace.clone(),
+        IpAddr::V4(Ipv4Addr::UNSPECIFIED),
+    );
 
     Ok(Server::bind(&addr)
         .serve(make_service_fn(move |request: &AddrStream| {
-            let scheduler_server = SchedulerServer::new(
-                config_backend.clone(),
-                namespace.clone(),
-                request.remote_addr().ip(),
-            );
+            let mut scheduler_server = 
scheduler_server_without_caller_ip.clone();
+            scheduler_server.set_caller_ip(request.remote_addr().ip());
+
             let scheduler_grpc_server =
                 SchedulerGrpcServer::new(scheduler_server.clone());
 

Reply via email to