yahoNanJing commented on a change in pull request #1810:
URL: https://github.com/apache/arrow-datafusion/pull/1810#discussion_r805449691
##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -164,8 +152,8 @@ impl ExecutorServer {
// TODO Error handling
self.scheduler
.clone()
- .send_heart_beat(SendHeartBeatParams {
- metadata: Some(self.executor_meta.clone()),
+ .receive_heart_beat(HeartBeatParams {
+ executor_id: self.executor.metadata.id.clone(),
state: Some(self.get_executor_state().await.into()),
})
.await
Review comment:
Agree. It's more clear than before.
##########
File path: ballista/rust/executor/src/standalone.rs
##########
@@ -36,40 +37,43 @@ pub async fn new_standalone_executor(
scheduler: SchedulerGrpcClient<Channel>,
concurrent_tasks: usize,
) -> Result<()> {
- let work_dir = TempDir::new()?
- .into_path()
- .into_os_string()
- .into_string()
- .unwrap();
- let executor = Arc::new(Executor::new(&work_dir));
-
- let service = BallistaFlightService::new(executor.clone());
-
- let server = FlightServiceServer::new(service);
// Let the OS assign a random, free port
let listener = TcpListener::bind("localhost:0").await?;
let addr = listener.local_addr()?;
info!(
"Ballista v{} Rust Executor listening on {:?}",
BALLISTA_VERSION, addr
);
- tokio::spawn(
- Server::builder().add_service(server).serve_with_incoming(
- tokio_stream::wrappers::TcpListenerStream::new(listener),
- ),
- );
+
let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
optional_host: Some(OptionalHost::Host("localhost".to_string())),
port: addr.port() as u32,
// TODO Make it configurable
grpc_port: 50020,
Review comment:
It's mainly for unit test. For production execution, it's already
configurable.
##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -153,24 +154,18 @@ impl SchedulerServer {
.as_millis(),
policy,
scheduler_env,
- executors_client: Arc::new(RwLock::new(HashMap::new())),
+ executors_client,
}
}
+ pub async fn init(&self) -> Result<(), BallistaError> {
+ self.state.init().await?;
+
+ Ok(())
+ }
+
async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> {
- let alive_executors = self
- .state
- .get_alive_executors_metadata_within_one_minute()
- .await?;
- let alive_executors: HashMap<String, ExecutorMeta> = alive_executors
- .into_iter()
- .map(|e| (e.id.clone(), e))
- .collect();
- let available_executors =
self.state.get_available_executors_data().await?;
- let mut available_executors: Vec<ExecutorData> = available_executors
- .into_iter()
- .filter(|e| alive_executors.contains_key(&e.executor_id))
- .collect();
+ let mut available_executors =
self.state.get_available_executors_data();
// In case of there's no enough resources, reschedule the tasks of the
job
if available_executors.is_empty() {
Review comment:
Agree. Since the inner sleep is async and will not block the execution
threads. Currently it's no need to spawn another thread for this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]