houqp commented on a change in pull request #1560:
URL: https://github.com/apache/arrow-datafusion/pull/1560#discussion_r787363383
##########
File path: ballista/rust/executor/src/main.rs
##########
@@ -94,32 +97,54 @@ async fn main() -> Result<()> {
.clone()
.map(executor_registration::OptionalHost::Host),
port: port as u32,
+ grpc_port: grpc_port as u32,
};
+ let executor_specification = ExecutorSpecification {
+ task_slots: opt.concurrent_tasks as u32,
+ };
+ let executor = Arc::new(Executor::new_with_specification(
+ &work_dir,
+ executor_specification,
+ ));
let scheduler = SchedulerGrpcClient::connect(scheduler_url)
.await
.context("Could not connect to scheduler")?;
- let executor = Arc::new(Executor::new(&work_dir));
-
- let service = BallistaFlightService::new(executor.clone());
+ let scheduler_policy = opt.task_scheduling_policy;
+ match scheduler_policy {
+ TaskSchedulingPolicy::PushStaged => {
+ tokio::spawn(executor_server::startup(
+ scheduler,
+ executor.clone(),
+ executor_meta,
+ ));
+ }
+ _ => {
+ tokio::spawn(execution_loop::poll_loop(
Review comment:
What's the reason for keeping the polling design around? I am a little
bit concerned about the complexity to maintain these two models. From an end
user's point of view, which one should be chosen under what circumstance?
##########
File path: ballista/rust/executor/src/executor_server.rs
##########
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::convert::TryInto;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tokio::sync::mpsc;
+
+use log::{debug, info};
+use tonic::transport::{Channel, Server};
+use tonic::{Request, Response, Status};
+
+use ballista_core::error::BallistaError;
+use
ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
+use ballista_core::serde::protobuf::executor_grpc_server::{
+ ExecutorGrpc, ExecutorGrpcServer,
+};
+use ballista_core::serde::protobuf::executor_registration::OptionalHost;
+use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
+use ballista_core::serde::protobuf::{
+ ExecutorRegistration, LaunchTaskParams, LaunchTaskResult,
RegisterExecutorParams,
+ SendHeartBeatParams, StopExecutorParams, StopExecutorResult,
TaskDefinition,
+ UpdateTaskStatusParams,
+};
+use ballista_core::serde::scheduler::{ExecutorSpecification, ExecutorState};
+use datafusion::physical_plan::ExecutionPlan;
+
+use crate::as_task_status;
+use crate::executor::Executor;
+
+pub async fn startup(
+ mut scheduler: SchedulerGrpcClient<Channel>,
+ executor: Arc<Executor>,
+ executor_meta: ExecutorRegistration,
+) {
+ // TODO make the buffer size configurable
+ let (tx_task, rx_task) = mpsc::channel::<TaskDefinition>(1000);
+
+ let executor_server = ExecutorServer::new(
+ scheduler.clone(),
+ executor.clone(),
+ executor_meta.clone(),
+ ExecutorEnv { tx_task },
+ );
+
+ // 1. Start executor grpc service
+ {
+ let executor_meta = executor_meta.clone();
+ let addr = format!(
+ "{}:{}",
+ executor_meta
+ .optional_host
+ .map(|h| match h {
+ OptionalHost::Host(host) => host,
+ })
+ .unwrap_or_else(|| String::from("127.0.0.1")),
+ executor_meta.grpc_port
+ );
+ let addr = addr.parse().unwrap();
+ info!("Setup executor grpc service for {:?}", addr);
+
+ let server = ExecutorGrpcServer::new(executor_server.clone());
+ let grpc_server_future =
Server::builder().add_service(server).serve(addr);
+ tokio::spawn(async move { grpc_server_future.await });
+ }
+
+ let executor_server = Arc::new(executor_server);
+
+ // 2. Do executor registration
+ match register_executor(&mut scheduler, &executor_meta,
&executor.specification).await
+ {
+ Ok(_) => {
+ info!("Executor registration succeed");
+ }
+ Err(error) => {
+ panic!("Executor registration failed due to: {}", error);
+ }
+ };
+
+ // 3. Start Heartbeater
+ {
+ let heartbeater = Heartbeater::new(executor_server.clone());
+ heartbeater.start().await;
+ }
+
+ // 4. Start TaskRunnerPool
+ {
+ let task_runner_pool = TaskRunnerPool::new(executor_server.clone());
+ task_runner_pool.start(rx_task).await;
+ }
+}
+
+async fn register_executor(
+ scheduler: &mut SchedulerGrpcClient<Channel>,
+ executor_meta: &ExecutorRegistration,
+ specification: &ExecutorSpecification,
+) -> Result<(), BallistaError> {
+ let result = scheduler
+ .register_executor(RegisterExecutorParams {
+ metadata: Some(executor_meta.clone()),
+ specification: Some(specification.clone().into()),
+ })
+ .await?;
+ if result.into_inner().success {
+ Ok(())
+ } else {
+ Err(BallistaError::General(
+ "Executor registration failed!!!".to_owned(),
+ ))
+ }
+}
+
+#[derive(Clone)]
+pub struct ExecutorServer {
+ _start_time: u128,
+ executor: Arc<Executor>,
+ executor_meta: ExecutorRegistration,
+ scheduler: SchedulerGrpcClient<Channel>,
+ executor_env: ExecutorEnv,
+}
+
+#[derive(Clone)]
+struct ExecutorEnv {
+ tx_task: mpsc::Sender<TaskDefinition>,
+}
+
+unsafe impl Sync for ExecutorEnv {}
+
+impl ExecutorServer {
+ fn new(
+ scheduler: SchedulerGrpcClient<Channel>,
+ executor: Arc<Executor>,
+ executor_meta: ExecutorRegistration,
+ executor_env: ExecutorEnv,
+ ) -> Self {
+ Self {
+ _start_time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
+ executor,
+ executor_meta,
+ scheduler,
+ executor_env,
+ }
+ }
+
+ async fn heartbeat(&self) {
+ // TODO Error handling
+ self.scheduler
+ .clone()
+ .send_heart_beat(SendHeartBeatParams {
+ metadata: Some(self.executor_meta.clone()),
+ state: Some(self.get_executor_state().await.into()),
+ })
+ .await
+ .unwrap();
+ }
+
+ async fn run_task(&self, task: TaskDefinition) -> Result<(),
BallistaError> {
+ let task_id = task.task_id.unwrap();
+ let task_id_log = format!(
+ "{}/{}/{}",
+ task_id.job_id, task_id.stage_id, task_id.partition_id
+ );
+ info!("Start to run task {}", task_id_log);
+
+ let plan: Arc<dyn ExecutionPlan> =
(&task.plan.unwrap()).try_into().unwrap();
+ let shuffle_output_partitioning =
+
parse_protobuf_hash_partitioning(task.output_partitioning.as_ref())?;
+
+ let execution_result = self
+ .executor
+ .execute_shuffle_write(
+ task_id.job_id.clone(),
+ task_id.stage_id as usize,
+ task_id.partition_id as usize,
+ plan,
+ shuffle_output_partitioning,
+ )
+ .await;
+ info!("Done with task {}", task_id_log);
+ debug!("Statistics: {:?}", execution_result);
+
+ // TODO use another channel to update the status of a task set
+ self.scheduler
+ .clone()
+ .update_task_status(UpdateTaskStatusParams {
Review comment:
I probably missed this in the code, what happens when the executor
crashed and never report an update for its assigned task? Not a blocker to this
PR, just curious how it's being handled.
##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -112,17 +140,171 @@ impl SchedulerServer {
tokio::spawn(async move {
state_clone.synchronize_job_status_loop().await });
Self {
- caller_ip,
state,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
+ policy,
+ scheduler_env,
+ executors_client: Arc::new(RwLock::new(HashMap::new())),
}
}
- pub fn set_caller_ip(&mut self, ip: IpAddr) {
- self.caller_ip = ip;
+ async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> {
+ let alive_executors = self
+ .state
+ .get_alive_executors_metadata_within_one_minute()
+ .await?;
+ let alive_executors: HashMap<String, ExecutorMeta> = alive_executors
+ .into_iter()
+ .map(|e| (e.id.clone(), e))
+ .collect();
+ let available_executors =
self.state.get_available_executors_data().await?;
+ let mut available_executors: Vec<ExecutorData> = available_executors
+ .into_iter()
+ .filter(|e| alive_executors.contains_key(&e.executor_id))
+ .collect();
+
+ // In case of there's no enough resources, reschedule the tasks of the
job
+ if available_executors.is_empty() {
+ let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
+ // TODO
+ tokio::spawn(async move {
+ warn!("Not enough available executors for task running");
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ tx_job.send(job_id).await.unwrap();
+ });
+ return Ok(());
+ }
+
+ let tasks_assigment = self.fetch_tasks(&mut available_executors,
&job_id).await?;
+ if !tasks_assigment.is_empty() {
+ let available_executors: HashMap<String, ExecutorData> =
available_executors
Review comment:
perhaps we can return the executor data or index into
`available_executors` from `fetch_tasks` instead of executor id so we won't
need to go through available_executors again here to build the hashmap.
##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -112,17 +140,171 @@ impl SchedulerServer {
tokio::spawn(async move {
state_clone.synchronize_job_status_loop().await });
Self {
- caller_ip,
state,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
+ policy,
+ scheduler_env,
+ executors_client: Arc::new(RwLock::new(HashMap::new())),
}
}
- pub fn set_caller_ip(&mut self, ip: IpAddr) {
- self.caller_ip = ip;
+ async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> {
+ let alive_executors = self
+ .state
+ .get_alive_executors_metadata_within_one_minute()
+ .await?;
+ let alive_executors: HashMap<String, ExecutorMeta> = alive_executors
+ .into_iter()
+ .map(|e| (e.id.clone(), e))
+ .collect();
+ let available_executors =
self.state.get_available_executors_data().await?;
+ let mut available_executors: Vec<ExecutorData> = available_executors
+ .into_iter()
+ .filter(|e| alive_executors.contains_key(&e.executor_id))
+ .collect();
+
+ // In case of there's no enough resources, reschedule the tasks of the
job
+ if available_executors.is_empty() {
+ let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
+ // TODO
Review comment:
todo without actual content can be deleted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]