yahoNanJing commented on a change in pull request #1810: URL: https://github.com/apache/arrow-datafusion/pull/1810#discussion_r805449691
########## File path: ballista/rust/executor/src/executor_server.rs ########## @@ -164,8 +152,8 @@ impl ExecutorServer { // TODO Error handling self.scheduler .clone() - .send_heart_beat(SendHeartBeatParams { - metadata: Some(self.executor_meta.clone()), + .receive_heart_beat(HeartBeatParams { + executor_id: self.executor.metadata.id.clone(), state: Some(self.get_executor_state().await.into()), }) .await Review comment: Agree. It's more clear than before. ########## File path: ballista/rust/executor/src/standalone.rs ########## @@ -36,40 +37,43 @@ pub async fn new_standalone_executor( scheduler: SchedulerGrpcClient<Channel>, concurrent_tasks: usize, ) -> Result<()> { - let work_dir = TempDir::new()? - .into_path() - .into_os_string() - .into_string() - .unwrap(); - let executor = Arc::new(Executor::new(&work_dir)); - - let service = BallistaFlightService::new(executor.clone()); - - let server = FlightServiceServer::new(service); // Let the OS assign a random, free port let listener = TcpListener::bind("localhost:0").await?; let addr = listener.local_addr()?; info!( "Ballista v{} Rust Executor listening on {:?}", BALLISTA_VERSION, addr ); - tokio::spawn( - Server::builder().add_service(server).serve_with_incoming( - tokio_stream::wrappers::TcpListenerStream::new(listener), - ), - ); + let executor_meta = ExecutorRegistration { id: Uuid::new_v4().to_string(), // assign this executor a unique ID optional_host: Some(OptionalHost::Host("localhost".to_string())), port: addr.port() as u32, // TODO Make it configurable grpc_port: 50020, Review comment: It's mainly for unit test. For production execution, it's already configurable. ########## File path: ballista/rust/scheduler/src/lib.rs ########## @@ -153,24 +154,18 @@ impl SchedulerServer { .as_millis(), policy, scheduler_env, - executors_client: Arc::new(RwLock::new(HashMap::new())), + executors_client, } } + pub async fn init(&self) -> Result<(), BallistaError> { + self.state.init().await?; + + Ok(()) + } + async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> { - let alive_executors = self - .state - .get_alive_executors_metadata_within_one_minute() - .await?; - let alive_executors: HashMap<String, ExecutorMeta> = alive_executors - .into_iter() - .map(|e| (e.id.clone(), e)) - .collect(); - let available_executors = self.state.get_available_executors_data().await?; - let mut available_executors: Vec<ExecutorData> = available_executors - .into_iter() - .filter(|e| alive_executors.contains_key(&e.executor_id)) - .collect(); + let mut available_executors = self.state.get_available_executors_data(); // In case of there's no enough resources, reschedule the tasks of the job if available_executors.is_empty() { Review comment: Agree. Since the inner sleep is async and will not block the execution threads. Currently it's no need to spawn another thread for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org