yahoNanJing commented on a change in pull request #1810:
URL: https://github.com/apache/arrow-datafusion/pull/1810#discussion_r805449691



##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -164,8 +152,8 @@ impl ExecutorServer {
         // TODO Error handling
         self.scheduler
             .clone()
-            .send_heart_beat(SendHeartBeatParams {
-                metadata: Some(self.executor_meta.clone()),
+            .receive_heart_beat(HeartBeatParams {
+                executor_id: self.executor.metadata.id.clone(),
                 state: Some(self.get_executor_state().await.into()),
             })
             .await

Review comment:
       Agree. It's more clear than before.

##########
File path: ballista/rust/executor/src/standalone.rs
##########
@@ -36,40 +37,43 @@ pub async fn new_standalone_executor(
     scheduler: SchedulerGrpcClient<Channel>,
     concurrent_tasks: usize,
 ) -> Result<()> {
-    let work_dir = TempDir::new()?
-        .into_path()
-        .into_os_string()
-        .into_string()
-        .unwrap();
-    let executor = Arc::new(Executor::new(&work_dir));
-
-    let service = BallistaFlightService::new(executor.clone());
-
-    let server = FlightServiceServer::new(service);
     // Let the OS assign a random, free port
     let listener = TcpListener::bind("localhost:0").await?;
     let addr = listener.local_addr()?;
     info!(
         "Ballista v{} Rust Executor listening on {:?}",
         BALLISTA_VERSION, addr
     );
-    tokio::spawn(
-        Server::builder().add_service(server).serve_with_incoming(
-            tokio_stream::wrappers::TcpListenerStream::new(listener),
-        ),
-    );
+
     let executor_meta = ExecutorRegistration {
         id: Uuid::new_v4().to_string(), // assign this executor a unique ID
         optional_host: Some(OptionalHost::Host("localhost".to_string())),
         port: addr.port() as u32,
         // TODO Make it configurable
         grpc_port: 50020,

Review comment:
       It's mainly for unit test. For production execution, it's already 
configurable.

##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -153,24 +154,18 @@ impl SchedulerServer {
                 .as_millis(),
             policy,
             scheduler_env,
-            executors_client: Arc::new(RwLock::new(HashMap::new())),
+            executors_client,
         }
     }
 
+    pub async fn init(&self) -> Result<(), BallistaError> {
+        self.state.init().await?;
+
+        Ok(())
+    }
+
     async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> {
-        let alive_executors = self
-            .state
-            .get_alive_executors_metadata_within_one_minute()
-            .await?;
-        let alive_executors: HashMap<String, ExecutorMeta> = alive_executors
-            .into_iter()
-            .map(|e| (e.id.clone(), e))
-            .collect();
-        let available_executors = 
self.state.get_available_executors_data().await?;
-        let mut available_executors: Vec<ExecutorData> = available_executors
-            .into_iter()
-            .filter(|e| alive_executors.contains_key(&e.executor_id))
-            .collect();
+        let mut available_executors = 
self.state.get_available_executors_data();
 
         // In case of there's no enough resources, reschedule the tasks of the 
job
         if available_executors.is_empty() {

Review comment:
       Agree. Since the inner sleep is async and will not block the execution 
threads. Currently it's no need to spawn another thread for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to