This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ccb7520 Add some resiliency to lost executors (#568)
ccb7520 is described below
commit ccb75200397a89fdc9ebe8294ae1521a3e94485b
Author: Ximo Guanter <[email protected]>
AuthorDate: Sat Jun 26 15:39:33 2021 +0200
Add some resiliency to lost executors (#568)
---
ballista/rust/core/proto/ballista.proto | 10 +-
ballista/rust/executor/src/execution_loop.rs | 11 +-
ballista/rust/scheduler/src/api/handlers.rs | 39 +++----
ballista/rust/scheduler/src/lib.rs | 42 +-------
ballista/rust/scheduler/src/state/etcd.rs | 27 +----
ballista/rust/scheduler/src/state/mod.rs | 131 ++++++++++++++++++------
ballista/rust/scheduler/src/state/standalone.rs | 24 ++---
7 files changed, 144 insertions(+), 140 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index d75cbaa..365d8e9 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -745,10 +745,10 @@ message ExecutorRegistration {
uint32 port = 3;
}
-message GetExecutorMetadataParams {}
-
-message GetExecutorMetadataResult {
- repeated ExecutorMetadata metadata = 1;
+message ExecutorHeartbeat {
+ ExecutorMetadata meta = 1;
+ // Unix epoch-based timestamp in seconds
+ uint64 timestamp = 2;
}
message RunningTask {
@@ -847,8 +847,6 @@ message FilePartitionMetadata {
}
service SchedulerGrpc {
- rpc GetExecutorsMetadata (GetExecutorMetadataParams) returns
(GetExecutorMetadataResult) {}
-
// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}
diff --git a/ballista/rust/executor/src/execution_loop.rs
b/ballista/rust/executor/src/execution_loop.rs
index 6eb4713..17a8d8c 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -91,10 +91,14 @@ async fn run_received_tasks(
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
) {
- info!("Received task {:?}", task.task_id.as_ref().unwrap());
+ let task_id = task.task_id.unwrap();
+ let task_id_log = format!(
+ "{}/{}/{}",
+ task_id.job_id, task_id.stage_id, task_id.partition_id
+ );
+ info!("Received task {}", task_id_log);
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
let plan: Arc<dyn ExecutionPlan> =
(&task.plan.unwrap()).try_into().unwrap();
- let task_id = task.task_id.unwrap();
tokio::spawn(async move {
let execution_result = executor
@@ -105,7 +109,8 @@ async fn run_received_tasks(
plan,
)
.await;
- info!("DONE WITH TASK: {:?}", execution_result);
+ info!("Done with task {}", task_id_log);
+ debug!("Statistics: {:?}", execution_result);
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
let _ = task_status_sender.send(as_task_status(
execution_result.map(|_| ()),
diff --git a/ballista/rust/scheduler/src/api/handlers.rs
b/ballista/rust/scheduler/src/api/handlers.rs
index 7293558..ee0ee73 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -11,45 +11,32 @@
// limitations under the License.
use crate::SchedulerServer;
-use ballista_core::serde::protobuf::{
- scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata,
GetExecutorMetadataParams,
- GetExecutorMetadataResult,
-};
-use ballista_core::serde::scheduler::ExecutorMeta;
-use tonic::{Request, Response};
+use ballista_core::{serde::scheduler::ExecutorMeta, BALLISTA_VERSION};
use warp::Rejection;
#[derive(Debug, serde::Serialize)]
struct StateResponse {
executors: Vec<ExecutorMeta>,
started: u128,
- version: String,
+ version: &'static str,
}
pub(crate) async fn scheduler_state(
data_server: SchedulerServer,
) -> Result<impl warp::Reply, Rejection> {
- let data: Result<Response<GetExecutorMetadataResult>, tonic::Status> =
data_server
- .get_executors_metadata(Request::new(GetExecutorMetadataParams {}))
- .await;
- let metadata: Vec<ExecutorMeta> = match data {
- Ok(result) => {
- let res: &GetExecutorMetadataResult = result.get_ref();
- let vec: &Vec<ExecutorMetadata> = &res.metadata;
- vec.iter()
- .map(|v: &ExecutorMetadata| ExecutorMeta {
- host: v.host.clone(),
- port: v.port as u16,
- id: v.id.clone(),
- })
- .collect()
- }
- Err(_) => vec![],
- };
+ // TODO: Display last seen information in UI
+ let executors: Vec<ExecutorMeta> = data_server
+ .state
+ .get_executors_metadata()
+ .await
+ .unwrap_or_default()
+ .into_iter()
+ .map(|(metadata, _duration)| metadata)
+ .collect();
let response = StateResponse {
- executors: metadata,
+ executors,
started: data_server.start_time,
- version: data_server.version.clone(),
+ version: BALLISTA_VERSION,
};
Ok(warp::reply::json(&response))
}
diff --git a/ballista/rust/scheduler/src/lib.rs
b/ballista/rust/scheduler/src/lib.rs
index 54cba48..3620f79 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -34,10 +34,10 @@ use std::{fmt, net::IpAddr};
use ballista_core::serde::protobuf::{
execute_query_params::Query, executor_registration::OptionalHost,
job_status,
scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams,
ExecuteQueryResult,
- FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams,
- GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult,
- GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId,
PollWorkParams,
- PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus,
+ FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
+ GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
+ PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob,
TaskDefinition,
+ TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMeta;
@@ -76,9 +76,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
#[derive(Clone)]
pub struct SchedulerServer {
caller_ip: IpAddr,
- state: Arc<SchedulerState>,
+ pub(crate) state: Arc<SchedulerState>,
start_time: u128,
- version: String,
}
impl SchedulerServer {
@@ -87,7 +86,6 @@ impl SchedulerServer {
namespace: String,
caller_ip: IpAddr,
) -> Self {
- const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
let state = Arc::new(SchedulerState::new(config, namespace));
let state_clone = state.clone();
@@ -101,35 +99,12 @@ impl SchedulerServer {
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
- version: VERSION.unwrap_or("Unknown").to_string(),
}
}
}
#[tonic::async_trait]
impl SchedulerGrpc for SchedulerServer {
- async fn get_executors_metadata(
- &self,
- _request: Request<GetExecutorMetadataParams>,
- ) -> std::result::Result<Response<GetExecutorMetadataResult>,
tonic::Status> {
- info!("Received get_executors_metadata request");
- let result = self
- .state
- .get_executors_metadata()
- .await
- .map_err(|e| {
- let msg = format!("Error reading executors metadata: {}", e);
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?
- .into_iter()
- .map(|meta| meta.into())
- .collect();
- Ok(Response::new(GetExecutorMetadataResult {
- metadata: result,
- }))
- }
-
async fn poll_work(
&self,
request: Request<PollWorkParams>,
@@ -279,13 +254,6 @@ impl SchedulerGrpc for SchedulerServer {
}
};
debug!("Received plan for execution: {:?}", plan);
- let executors =
self.state.get_executors_metadata().await.map_err(|e| {
- let msg = format!("Error reading executors metadata: {}", e);
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?;
- debug!("Found executors: {:?}", executors);
-
let job_id: String = {
let mut rng = thread_rng();
std::iter::repeat(())
diff --git a/ballista/rust/scheduler/src/state/etcd.rs
b/ballista/rust/scheduler/src/state/etcd.rs
index 807477d..d6741a7 100644
--- a/ballista/rust/scheduler/src/state/etcd.rs
+++ b/ballista/rust/scheduler/src/state/etcd.rs
@@ -17,14 +17,12 @@
//! Etcd config backend.
-use std::{task::Poll, time::Duration};
+use std::task::Poll;
use crate::state::ConfigBackendClient;
use ballista_core::error::{ballista_error, Result};
-use etcd_client::{
- GetOptions, LockResponse, PutOptions, WatchOptions, WatchStream, Watcher,
-};
+use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream,
Watcher};
use futures::{Stream, StreamExt};
use log::warn;
@@ -70,25 +68,9 @@ impl ConfigBackendClient for EtcdClient {
.collect())
}
- async fn put(
- &self,
- key: String,
- value: Vec<u8>,
- lease_time: Option<Duration>,
- ) -> Result<()> {
+ async fn put(&self, key: String, value: Vec<u8>) -> Result<()> {
let mut etcd = self.etcd.clone();
- let put_options = if let Some(lease_time) = lease_time {
- etcd.lease_grant(lease_time.as_secs() as i64, None)
- .await
- .map(|lease| Some(PutOptions::new().with_lease(lease.id())))
- .map_err(|e| {
- warn!("etcd lease grant failed: {:?}", e.to_string());
- ballista_error("etcd lease grant failed")
- })?
- } else {
- None
- };
- etcd.put(key.clone(), value.clone(), put_options)
+ etcd.put(key.clone(), value.clone(), None)
.await
.map_err(|e| {
warn!("etcd put failed: {}", e);
@@ -99,6 +81,7 @@ impl ConfigBackendClient for EtcdClient {
async fn lock(&self) -> Result<Box<dyn Lock>> {
let mut etcd = self.etcd.clone();
+ // TODO: make this a namespaced-lock
let lock = etcd
.lock("/ballista_global_lock", None)
.await
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index 75f1574..a17c82d 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::time::{SystemTime, UNIX_EPOCH};
use std::{
any::type_name, collections::HashMap, convert::TryInto, sync::Arc,
time::Duration,
};
@@ -26,8 +27,9 @@ use prost::Message;
use tokio::sync::OwnedMutexGuard;
use ballista_core::serde::protobuf::{
- job_status, task_status, CompletedJob, CompletedTask, ExecutorMetadata,
FailedJob,
- FailedTask, JobStatus, PhysicalPlanNode, RunningJob, RunningTask,
TaskStatus,
+ job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat,
+ ExecutorMetadata, FailedJob, FailedTask, JobStatus, PhysicalPlanNode,
RunningJob,
+ RunningTask, TaskStatus,
};
use ballista_core::serde::scheduler::PartitionStats;
use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta};
@@ -48,8 +50,6 @@ pub use etcd::EtcdClient;
#[cfg(feature = "sled")]
pub use standalone::StandaloneClient;
-const LEASE_TIME: Duration = Duration::from_secs(60);
-
/// A trait that contains the necessary methods to save and retrieve the state
and configuration of a cluster.
#[tonic::async_trait]
pub trait ConfigBackendClient: Send + Sync {
@@ -62,12 +62,7 @@ pub trait ConfigBackendClient: Send + Sync {
async fn get_from_prefix(&self, prefix: &str) -> Result<Vec<(String,
Vec<u8>)>>;
/// Saves the value into the provided key, overriding any previous data
that might have been associated to that key.
- async fn put(
- &self,
- key: String,
- value: Vec<u8>,
- lease_time: Option<Duration>,
- ) -> Result<()>;
+ async fn put(&self, key: String, value: Vec<u8>) -> Result<()>;
async fn lock(&self) -> Result<Box<dyn Lock>>;
@@ -104,25 +99,55 @@ impl SchedulerState {
}
}
- pub async fn get_executors_metadata(&self) -> Result<Vec<ExecutorMeta>> {
+ pub async fn get_executors_metadata(&self) -> Result<Vec<(ExecutorMeta,
Duration)>> {
let mut result = vec![];
let entries = self
.config_client
.get_from_prefix(&get_executors_prefix(&self.namespace))
.await?;
+ let now_epoch_ts = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards");
for (_key, entry) in entries {
- let meta: ExecutorMetadata = decode_protobuf(&entry)?;
- result.push(meta.into());
+ let heartbeat: ExecutorHeartbeat = decode_protobuf(&entry)?;
+ let meta = heartbeat.meta.unwrap();
+ let ts = Duration::from_secs(heartbeat.timestamp);
+ let time_since_last_seen = now_epoch_ts
+ .checked_sub(ts)
+ .unwrap_or_else(|| Duration::from_secs(0));
+ result.push((meta.into(), time_since_last_seen));
}
Ok(result)
}
+ pub async fn get_alive_executors_metadata(
+ &self,
+ last_seen_threshold: Duration,
+ ) -> Result<Vec<ExecutorMeta>> {
+ Ok(self
+ .get_executors_metadata()
+ .await?
+ .into_iter()
+ .filter_map(|(exec, last_seen)| {
+ (last_seen < last_seen_threshold).then(|| exec)
+ })
+ .collect())
+ }
+
pub async fn save_executor_metadata(&self, meta: ExecutorMeta) ->
Result<()> {
let key = get_executor_key(&self.namespace, &meta.id);
let meta: ExecutorMetadata = meta.into();
- let value: Vec<u8> = encode_protobuf(&meta)?;
- self.config_client.put(key, value, Some(LEASE_TIME)).await
+ let timestamp = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Time went backwards")
+ .as_secs();
+ let heartbeat = ExecutorHeartbeat {
+ meta: Some(meta),
+ timestamp,
+ };
+ let value: Vec<u8> = encode_protobuf(&heartbeat)?;
+ self.config_client.put(key, value).await
}
pub async fn save_job_metadata(
@@ -133,7 +158,7 @@ impl SchedulerState {
debug!("Saving job metadata: {:?}", status);
let key = get_job_key(&self.namespace, job_id);
let value = encode_protobuf(status)?;
- self.config_client.put(key, value, None).await
+ self.config_client.put(key, value).await
}
pub async fn get_job_metadata(&self, job_id: &str) -> Result<JobStatus> {
@@ -158,7 +183,7 @@ impl SchedulerState {
partition_id.partition_id as usize,
);
let value = encode_protobuf(status)?;
- self.config_client.put(key, value, None).await
+ self.config_client.put(key, value).await
}
pub async fn _get_task_status(
@@ -191,7 +216,7 @@ impl SchedulerState {
let proto: PhysicalPlanNode = plan.try_into()?;
encode_protobuf(&proto)?
};
- self.config_client.clone().put(key, value, None).await
+ self.config_client.clone().put(key, value).await
}
pub async fn get_stage_plan(
@@ -211,6 +236,40 @@ impl SchedulerState {
Ok((&value).try_into()?)
}
+ /// This function ensures that the task wasn't assigned to an executor
that died.
+ /// If that is the case, then the task is re-scheduled.
+ /// Returns true if the task was dead, false otherwise.
+ async fn reschedule_dead_task(
+ &self,
+ task_status: &TaskStatus,
+ executors: &[ExecutorMeta],
+ ) -> Result<bool> {
+ let executor_id: &str = match &task_status.status {
+ Some(task_status::Status::Completed(CompletedTask { executor_id
})) => {
+ executor_id
+ }
+ Some(task_status::Status::Running(RunningTask { executor_id })) =>
{
+ executor_id
+ }
+ _ => return Ok(false),
+ };
+ let executor_meta = executors.iter().find(|exec| exec.id ==
executor_id);
+ let task_is_dead = executor_meta.is_none();
+ if task_is_dead {
+ info!(
+ "Executor {} isn't alive. Rescheduling task {:?}",
+ executor_id,
+ task_status.partition_id.as_ref().unwrap()
+ );
+ // Task was handled in an executor that isn't alive anymore, so we
can't resolve it
+ // We mark the task as pending again and continue
+ let mut task_status = task_status.clone();
+ task_status.status = None;
+ self.save_task_status(&task_status).await?;
+ }
+ Ok(task_is_dead)
+ }
+
pub async fn assign_next_schedulable_task(
&self,
executor_id: &str,
@@ -221,7 +280,10 @@ impl SchedulerState {
.await?
.into_iter()
.collect();
- let executors = self.get_executors_metadata().await?;
+ // TODO: Make the duration a configurable parameter
+ let executors = self
+ .get_alive_executors_metadata(Duration::from_secs(60))
+ .await?;
'tasks: for (_key, value) in kvs.iter() {
let mut status: TaskStatus = decode_protobuf(value)?;
if status.status.is_none() {
@@ -249,13 +311,23 @@ impl SchedulerState {
.unwrap();
let referenced_task: TaskStatus =
decode_protobuf(referenced_task)?;
- if let
Some(task_status::Status::Completed(CompletedTask {
- executor_id,
- })) = referenced_task.status
+ let task_is_dead = self
+ .reschedule_dead_task(&referenced_task,
&executors)
+ .await?;
+ if task_is_dead {
+ continue 'tasks;
+ } else if let Some(task_status::Status::Completed(
+ CompletedTask { executor_id },
+ )) = referenced_task.status
{
let empty = vec![];
let locations =
partition_locations.entry(stage_id).or_insert(empty);
+ let executor_meta = executors
+ .iter()
+ .find(|exec| exec.id == executor_id)
+ .unwrap()
+ .clone();
locations.push(vec![
ballista_core::serde::scheduler::PartitionLocation {
partition_id:
@@ -264,11 +336,7 @@ impl SchedulerState {
stage_id,
partition_id,
},
- executor_meta: executors
- .iter()
- .find(|exec| exec.id ==
executor_id)
- .unwrap()
- .clone(),
+ executor_meta,
partition_stats:
PartitionStats::default(),
},
]);
@@ -336,7 +404,7 @@ impl SchedulerState {
.get_executors_metadata()
.await?
.into_iter()
- .map(|meta| (meta.id.to_string(), meta))
+ .map(|(meta, _)| (meta.id.to_string(), meta))
.collect();
let status: JobStatus = decode_protobuf(&value)?;
let new_status = self.get_job_status_from_tasks(job_id,
&executors).await?;
@@ -553,7 +621,12 @@ mod test {
port: 123,
};
state.save_executor_metadata(meta.clone()).await?;
- let result = state.get_executors_metadata().await?;
+ let result: Vec<_> = state
+ .get_executors_metadata()
+ .await?
+ .into_iter()
+ .map(|(meta, _)| meta)
+ .collect();
assert_eq!(vec![meta], result);
Ok(())
}
diff --git a/ballista/rust/scheduler/src/state/standalone.rs
b/ballista/rust/scheduler/src/state/standalone.rs
index 69805c0..8514d4c 100644
--- a/ballista/rust/scheduler/src/state/standalone.rs
+++ b/ballista/rust/scheduler/src/state/standalone.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::{sync::Arc, task::Poll, time::Duration};
+use std::{sync::Arc, task::Poll};
use crate::state::ConfigBackendClient;
use ballista_core::error::{ballista_error, BallistaError, Result};
@@ -89,13 +89,7 @@ impl ConfigBackendClient for StandaloneClient {
.map_err(|e| ballista_error(&format!("sled error {:?}", e)))?)
}
- // TODO: support lease_time. See
https://github.com/spacejam/sled/issues/1119 for how to approach this
- async fn put(
- &self,
- key: String,
- value: Vec<u8>,
- _lease_time: Option<Duration>,
- ) -> Result<()> {
+ async fn put(&self, key: String, value: Vec<u8>) -> Result<()> {
self.db
.insert(key, value)
.map_err(|e| {
@@ -170,7 +164,7 @@ mod tests {
let client = create_instance()?;
let key = "key";
let value = "value".as_bytes();
- client.put(key.to_owned(), value.to_vec(), None).await?;
+ client.put(key.to_owned(), value.to_vec()).await?;
assert_eq!(client.get(key).await?, value);
Ok(())
}
@@ -189,12 +183,8 @@ mod tests {
let client = create_instance()?;
let key = "key";
let value = "value".as_bytes();
- client
- .put(format!("{}/1", key), value.to_vec(), None)
- .await?;
- client
- .put(format!("{}/2", key), value.to_vec(), None)
- .await?;
+ client.put(format!("{}/1", key), value.to_vec()).await?;
+ client.put(format!("{}/2", key), value.to_vec()).await?;
assert_eq!(
client.get_from_prefix(key).await?,
vec![
@@ -211,13 +201,13 @@ mod tests {
let key = "key";
let value = "value".as_bytes();
let mut watch: Box<dyn Watch> = client.watch(key.to_owned()).await?;
- client.put(key.to_owned(), value.to_vec(), None).await?;
+ client.put(key.to_owned(), value.to_vec()).await?;
assert_eq!(
watch.next().await,
Some(WatchEvent::Put(key.to_owned(), value.to_owned()))
);
let value2 = "value2".as_bytes();
- client.put(key.to_owned(), value2.to_vec(), None).await?;
+ client.put(key.to_owned(), value2.to_vec()).await?;
assert_eq!(
watch.next().await,
Some(WatchEvent::Put(key.to_owned(), value2.to_owned()))