This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb7520  Add some resiliency to lost executors (#568)
ccb7520 is described below

commit ccb75200397a89fdc9ebe8294ae1521a3e94485b
Author: Ximo Guanter <[email protected]>
AuthorDate: Sat Jun 26 15:39:33 2021 +0200

    Add some resiliency to lost executors (#568)
---
 ballista/rust/core/proto/ballista.proto         |  10 +-
 ballista/rust/executor/src/execution_loop.rs    |  11 +-
 ballista/rust/scheduler/src/api/handlers.rs     |  39 +++----
 ballista/rust/scheduler/src/lib.rs              |  42 +-------
 ballista/rust/scheduler/src/state/etcd.rs       |  27 +----
 ballista/rust/scheduler/src/state/mod.rs        | 131 ++++++++++++++++++------
 ballista/rust/scheduler/src/state/standalone.rs |  24 ++---
 7 files changed, 144 insertions(+), 140 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index d75cbaa..365d8e9 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -745,10 +745,10 @@ message ExecutorRegistration {
   uint32 port = 3;
 }
 
-message GetExecutorMetadataParams {}
-
-message GetExecutorMetadataResult {
-  repeated ExecutorMetadata metadata = 1;
+message ExecutorHeartbeat {
+  ExecutorMetadata meta = 1;
+  // Unix epoch-based timestamp in seconds
+  uint64 timestamp = 2;
 }
 
 message RunningTask {
@@ -847,8 +847,6 @@ message FilePartitionMetadata {
 }
 
 service SchedulerGrpc {
-  rpc GetExecutorsMetadata (GetExecutorMetadataParams) returns 
(GetExecutorMetadataResult) {}
-
   // Executors must poll the scheduler for heartbeat and to receive tasks
   rpc PollWork (PollWorkParams) returns (PollWorkResult) {}
 
diff --git a/ballista/rust/executor/src/execution_loop.rs 
b/ballista/rust/executor/src/execution_loop.rs
index 6eb4713..17a8d8c 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -91,10 +91,14 @@ async fn run_received_tasks(
     task_status_sender: Sender<TaskStatus>,
     task: TaskDefinition,
 ) {
-    info!("Received task {:?}", task.task_id.as_ref().unwrap());
+    let task_id = task.task_id.unwrap();
+    let task_id_log = format!(
+        "{}/{}/{}",
+        task_id.job_id, task_id.stage_id, task_id.partition_id
+    );
+    info!("Received task {}", task_id_log);
     available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
     let plan: Arc<dyn ExecutionPlan> = 
(&task.plan.unwrap()).try_into().unwrap();
-    let task_id = task.task_id.unwrap();
 
     tokio::spawn(async move {
         let execution_result = executor
@@ -105,7 +109,8 @@ async fn run_received_tasks(
                 plan,
             )
             .await;
-        info!("DONE WITH TASK: {:?}", execution_result);
+        info!("Done with task {}", task_id_log);
+        debug!("Statistics: {:?}", execution_result);
         available_tasks_slots.fetch_add(1, Ordering::SeqCst);
         let _ = task_status_sender.send(as_task_status(
             execution_result.map(|_| ()),
diff --git a/ballista/rust/scheduler/src/api/handlers.rs 
b/ballista/rust/scheduler/src/api/handlers.rs
index 7293558..ee0ee73 100644
--- a/ballista/rust/scheduler/src/api/handlers.rs
+++ b/ballista/rust/scheduler/src/api/handlers.rs
@@ -11,45 +11,32 @@
 // limitations under the License.
 
 use crate::SchedulerServer;
-use ballista_core::serde::protobuf::{
-    scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata, 
GetExecutorMetadataParams,
-    GetExecutorMetadataResult,
-};
-use ballista_core::serde::scheduler::ExecutorMeta;
-use tonic::{Request, Response};
+use ballista_core::{serde::scheduler::ExecutorMeta, BALLISTA_VERSION};
 use warp::Rejection;
 
 #[derive(Debug, serde::Serialize)]
 struct StateResponse {
     executors: Vec<ExecutorMeta>,
     started: u128,
-    version: String,
+    version: &'static str,
 }
 
 pub(crate) async fn scheduler_state(
     data_server: SchedulerServer,
 ) -> Result<impl warp::Reply, Rejection> {
-    let data: Result<Response<GetExecutorMetadataResult>, tonic::Status> = 
data_server
-        .get_executors_metadata(Request::new(GetExecutorMetadataParams {}))
-        .await;
-    let metadata: Vec<ExecutorMeta> = match data {
-        Ok(result) => {
-            let res: &GetExecutorMetadataResult = result.get_ref();
-            let vec: &Vec<ExecutorMetadata> = &res.metadata;
-            vec.iter()
-                .map(|v: &ExecutorMetadata| ExecutorMeta {
-                    host: v.host.clone(),
-                    port: v.port as u16,
-                    id: v.id.clone(),
-                })
-                .collect()
-        }
-        Err(_) => vec![],
-    };
+    // TODO: Display last seen information in UI
+    let executors: Vec<ExecutorMeta> = data_server
+        .state
+        .get_executors_metadata()
+        .await
+        .unwrap_or_default()
+        .into_iter()
+        .map(|(metadata, _duration)| metadata)
+        .collect();
     let response = StateResponse {
-        executors: metadata,
+        executors,
         started: data_server.start_time,
-        version: data_server.version.clone(),
+        version: BALLISTA_VERSION,
     };
     Ok(warp::reply::json(&response))
 }
diff --git a/ballista/rust/scheduler/src/lib.rs 
b/ballista/rust/scheduler/src/lib.rs
index 54cba48..3620f79 100644
--- a/ballista/rust/scheduler/src/lib.rs
+++ b/ballista/rust/scheduler/src/lib.rs
@@ -34,10 +34,10 @@ use std::{fmt, net::IpAddr};
 use ballista_core::serde::protobuf::{
     execute_query_params::Query, executor_registration::OptionalHost, 
job_status,
     scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, 
ExecuteQueryResult,
-    FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams,
-    GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult,
-    GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, 
PollWorkParams,
-    PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus,
+    FailedJob, FilePartitionMetadata, FileType, GetFileMetadataParams,
+    GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
+    PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, 
TaskDefinition,
+    TaskStatus,
 };
 use ballista_core::serde::scheduler::ExecutorMeta;
 
@@ -76,9 +76,8 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};
 #[derive(Clone)]
 pub struct SchedulerServer {
     caller_ip: IpAddr,
-    state: Arc<SchedulerState>,
+    pub(crate) state: Arc<SchedulerState>,
     start_time: u128,
-    version: String,
 }
 
 impl SchedulerServer {
@@ -87,7 +86,6 @@ impl SchedulerServer {
         namespace: String,
         caller_ip: IpAddr,
     ) -> Self {
-        const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
         let state = Arc::new(SchedulerState::new(config, namespace));
         let state_clone = state.clone();
 
@@ -101,35 +99,12 @@ impl SchedulerServer {
                 .duration_since(UNIX_EPOCH)
                 .unwrap()
                 .as_millis(),
-            version: VERSION.unwrap_or("Unknown").to_string(),
         }
     }
 }
 
 #[tonic::async_trait]
 impl SchedulerGrpc for SchedulerServer {
-    async fn get_executors_metadata(
-        &self,
-        _request: Request<GetExecutorMetadataParams>,
-    ) -> std::result::Result<Response<GetExecutorMetadataResult>, 
tonic::Status> {
-        info!("Received get_executors_metadata request");
-        let result = self
-            .state
-            .get_executors_metadata()
-            .await
-            .map_err(|e| {
-                let msg = format!("Error reading executors metadata: {}", e);
-                error!("{}", msg);
-                tonic::Status::internal(msg)
-            })?
-            .into_iter()
-            .map(|meta| meta.into())
-            .collect();
-        Ok(Response::new(GetExecutorMetadataResult {
-            metadata: result,
-        }))
-    }
-
     async fn poll_work(
         &self,
         request: Request<PollWorkParams>,
@@ -279,13 +254,6 @@ impl SchedulerGrpc for SchedulerServer {
                 }
             };
             debug!("Received plan for execution: {:?}", plan);
-            let executors = 
self.state.get_executors_metadata().await.map_err(|e| {
-                let msg = format!("Error reading executors metadata: {}", e);
-                error!("{}", msg);
-                tonic::Status::internal(msg)
-            })?;
-            debug!("Found executors: {:?}", executors);
-
             let job_id: String = {
                 let mut rng = thread_rng();
                 std::iter::repeat(())
diff --git a/ballista/rust/scheduler/src/state/etcd.rs 
b/ballista/rust/scheduler/src/state/etcd.rs
index 807477d..d6741a7 100644
--- a/ballista/rust/scheduler/src/state/etcd.rs
+++ b/ballista/rust/scheduler/src/state/etcd.rs
@@ -17,14 +17,12 @@
 
 //! Etcd config backend.
 
-use std::{task::Poll, time::Duration};
+use std::task::Poll;
 
 use crate::state::ConfigBackendClient;
 use ballista_core::error::{ballista_error, Result};
 
-use etcd_client::{
-    GetOptions, LockResponse, PutOptions, WatchOptions, WatchStream, Watcher,
-};
+use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream, 
Watcher};
 use futures::{Stream, StreamExt};
 use log::warn;
 
@@ -70,25 +68,9 @@ impl ConfigBackendClient for EtcdClient {
             .collect())
     }
 
-    async fn put(
-        &self,
-        key: String,
-        value: Vec<u8>,
-        lease_time: Option<Duration>,
-    ) -> Result<()> {
+    async fn put(&self, key: String, value: Vec<u8>) -> Result<()> {
         let mut etcd = self.etcd.clone();
-        let put_options = if let Some(lease_time) = lease_time {
-            etcd.lease_grant(lease_time.as_secs() as i64, None)
-                .await
-                .map(|lease| Some(PutOptions::new().with_lease(lease.id())))
-                .map_err(|e| {
-                    warn!("etcd lease grant failed: {:?}", e.to_string());
-                    ballista_error("etcd lease grant failed")
-                })?
-        } else {
-            None
-        };
-        etcd.put(key.clone(), value.clone(), put_options)
+        etcd.put(key.clone(), value.clone(), None)
             .await
             .map_err(|e| {
                 warn!("etcd put failed: {}", e);
@@ -99,6 +81,7 @@ impl ConfigBackendClient for EtcdClient {
 
     async fn lock(&self) -> Result<Box<dyn Lock>> {
         let mut etcd = self.etcd.clone();
+        // TODO: make this a namespaced-lock
         let lock = etcd
             .lock("/ballista_global_lock", None)
             .await
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index 75f1574..a17c82d 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::time::{SystemTime, UNIX_EPOCH};
 use std::{
     any::type_name, collections::HashMap, convert::TryInto, sync::Arc, 
time::Duration,
 };
@@ -26,8 +27,9 @@ use prost::Message;
 use tokio::sync::OwnedMutexGuard;
 
 use ballista_core::serde::protobuf::{
-    job_status, task_status, CompletedJob, CompletedTask, ExecutorMetadata, 
FailedJob,
-    FailedTask, JobStatus, PhysicalPlanNode, RunningJob, RunningTask, 
TaskStatus,
+    job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat,
+    ExecutorMetadata, FailedJob, FailedTask, JobStatus, PhysicalPlanNode, 
RunningJob,
+    RunningTask, TaskStatus,
 };
 use ballista_core::serde::scheduler::PartitionStats;
 use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta};
@@ -48,8 +50,6 @@ pub use etcd::EtcdClient;
 #[cfg(feature = "sled")]
 pub use standalone::StandaloneClient;
 
-const LEASE_TIME: Duration = Duration::from_secs(60);
-
 /// A trait that contains the necessary methods to save and retrieve the state 
and configuration of a cluster.
 #[tonic::async_trait]
 pub trait ConfigBackendClient: Send + Sync {
@@ -62,12 +62,7 @@ pub trait ConfigBackendClient: Send + Sync {
     async fn get_from_prefix(&self, prefix: &str) -> Result<Vec<(String, 
Vec<u8>)>>;
 
     /// Saves the value into the provided key, overriding any previous data 
that might have been associated to that key.
-    async fn put(
-        &self,
-        key: String,
-        value: Vec<u8>,
-        lease_time: Option<Duration>,
-    ) -> Result<()>;
+    async fn put(&self, key: String, value: Vec<u8>) -> Result<()>;
 
     async fn lock(&self) -> Result<Box<dyn Lock>>;
 
@@ -104,25 +99,55 @@ impl SchedulerState {
         }
     }
 
-    pub async fn get_executors_metadata(&self) -> Result<Vec<ExecutorMeta>> {
+    pub async fn get_executors_metadata(&self) -> Result<Vec<(ExecutorMeta, 
Duration)>> {
         let mut result = vec![];
 
         let entries = self
             .config_client
             .get_from_prefix(&get_executors_prefix(&self.namespace))
             .await?;
+        let now_epoch_ts = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards");
         for (_key, entry) in entries {
-            let meta: ExecutorMetadata = decode_protobuf(&entry)?;
-            result.push(meta.into());
+            let heartbeat: ExecutorHeartbeat = decode_protobuf(&entry)?;
+            let meta = heartbeat.meta.unwrap();
+            let ts = Duration::from_secs(heartbeat.timestamp);
+            let time_since_last_seen = now_epoch_ts
+                .checked_sub(ts)
+                .unwrap_or_else(|| Duration::from_secs(0));
+            result.push((meta.into(), time_since_last_seen));
         }
         Ok(result)
     }
 
+    pub async fn get_alive_executors_metadata(
+        &self,
+        last_seen_threshold: Duration,
+    ) -> Result<Vec<ExecutorMeta>> {
+        Ok(self
+            .get_executors_metadata()
+            .await?
+            .into_iter()
+            .filter_map(|(exec, last_seen)| {
+                (last_seen < last_seen_threshold).then(|| exec)
+            })
+            .collect())
+    }
+
     pub async fn save_executor_metadata(&self, meta: ExecutorMeta) -> 
Result<()> {
         let key = get_executor_key(&self.namespace, &meta.id);
         let meta: ExecutorMetadata = meta.into();
-        let value: Vec<u8> = encode_protobuf(&meta)?;
-        self.config_client.put(key, value, Some(LEASE_TIME)).await
+        let timestamp = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .expect("Time went backwards")
+            .as_secs();
+        let heartbeat = ExecutorHeartbeat {
+            meta: Some(meta),
+            timestamp,
+        };
+        let value: Vec<u8> = encode_protobuf(&heartbeat)?;
+        self.config_client.put(key, value).await
     }
 
     pub async fn save_job_metadata(
@@ -133,7 +158,7 @@ impl SchedulerState {
         debug!("Saving job metadata: {:?}", status);
         let key = get_job_key(&self.namespace, job_id);
         let value = encode_protobuf(status)?;
-        self.config_client.put(key, value, None).await
+        self.config_client.put(key, value).await
     }
 
     pub async fn get_job_metadata(&self, job_id: &str) -> Result<JobStatus> {
@@ -158,7 +183,7 @@ impl SchedulerState {
             partition_id.partition_id as usize,
         );
         let value = encode_protobuf(status)?;
-        self.config_client.put(key, value, None).await
+        self.config_client.put(key, value).await
     }
 
     pub async fn _get_task_status(
@@ -191,7 +216,7 @@ impl SchedulerState {
             let proto: PhysicalPlanNode = plan.try_into()?;
             encode_protobuf(&proto)?
         };
-        self.config_client.clone().put(key, value, None).await
+        self.config_client.clone().put(key, value).await
     }
 
     pub async fn get_stage_plan(
@@ -211,6 +236,40 @@ impl SchedulerState {
         Ok((&value).try_into()?)
     }
 
+    /// This function ensures that the task wasn't assigned to an executor 
that died.
+    /// If that is the case, then the task is re-scheduled.
+    /// Returns true if the task was dead, false otherwise.
+    async fn reschedule_dead_task(
+        &self,
+        task_status: &TaskStatus,
+        executors: &[ExecutorMeta],
+    ) -> Result<bool> {
+        let executor_id: &str = match &task_status.status {
+            Some(task_status::Status::Completed(CompletedTask { executor_id 
})) => {
+                executor_id
+            }
+            Some(task_status::Status::Running(RunningTask { executor_id })) => 
{
+                executor_id
+            }
+            _ => return Ok(false),
+        };
+        let executor_meta = executors.iter().find(|exec| exec.id == 
executor_id);
+        let task_is_dead = executor_meta.is_none();
+        if task_is_dead {
+            info!(
+                "Executor {} isn't alive. Rescheduling task {:?}",
+                executor_id,
+                task_status.partition_id.as_ref().unwrap()
+            );
+            // Task was handled in an executor that isn't alive anymore, so we 
can't resolve it
+            // We mark the task as pending again and continue
+            let mut task_status = task_status.clone();
+            task_status.status = None;
+            self.save_task_status(&task_status).await?;
+        }
+        Ok(task_is_dead)
+    }
+
     pub async fn assign_next_schedulable_task(
         &self,
         executor_id: &str,
@@ -221,7 +280,10 @@ impl SchedulerState {
             .await?
             .into_iter()
             .collect();
-        let executors = self.get_executors_metadata().await?;
+        // TODO: Make the duration a configurable parameter
+        let executors = self
+            .get_alive_executors_metadata(Duration::from_secs(60))
+            .await?;
         'tasks: for (_key, value) in kvs.iter() {
             let mut status: TaskStatus = decode_protobuf(value)?;
             if status.status.is_none() {
@@ -249,13 +311,23 @@ impl SchedulerState {
                                 .unwrap();
                             let referenced_task: TaskStatus =
                                 decode_protobuf(referenced_task)?;
-                            if let 
Some(task_status::Status::Completed(CompletedTask {
-                                executor_id,
-                            })) = referenced_task.status
+                            let task_is_dead = self
+                                .reschedule_dead_task(&referenced_task, 
&executors)
+                                .await?;
+                            if task_is_dead {
+                                continue 'tasks;
+                            } else if let Some(task_status::Status::Completed(
+                                CompletedTask { executor_id },
+                            )) = referenced_task.status
                             {
                                 let empty = vec![];
                                 let locations =
                                     
partition_locations.entry(stage_id).or_insert(empty);
+                                let executor_meta = executors
+                                    .iter()
+                                    .find(|exec| exec.id == executor_id)
+                                    .unwrap()
+                                    .clone();
                                 locations.push(vec![
                                     
ballista_core::serde::scheduler::PartitionLocation {
                                         partition_id:
@@ -264,11 +336,7 @@ impl SchedulerState {
                                                 stage_id,
                                                 partition_id,
                                             },
-                                        executor_meta: executors
-                                            .iter()
-                                            .find(|exec| exec.id == 
executor_id)
-                                            .unwrap()
-                                            .clone(),
+                                        executor_meta,
                                         partition_stats: 
PartitionStats::default(),
                                     },
                                 ]);
@@ -336,7 +404,7 @@ impl SchedulerState {
             .get_executors_metadata()
             .await?
             .into_iter()
-            .map(|meta| (meta.id.to_string(), meta))
+            .map(|(meta, _)| (meta.id.to_string(), meta))
             .collect();
         let status: JobStatus = decode_protobuf(&value)?;
         let new_status = self.get_job_status_from_tasks(job_id, 
&executors).await?;
@@ -553,7 +621,12 @@ mod test {
             port: 123,
         };
         state.save_executor_metadata(meta.clone()).await?;
-        let result = state.get_executors_metadata().await?;
+        let result: Vec<_> = state
+            .get_executors_metadata()
+            .await?
+            .into_iter()
+            .map(|(meta, _)| meta)
+            .collect();
         assert_eq!(vec![meta], result);
         Ok(())
     }
diff --git a/ballista/rust/scheduler/src/state/standalone.rs 
b/ballista/rust/scheduler/src/state/standalone.rs
index 69805c0..8514d4c 100644
--- a/ballista/rust/scheduler/src/state/standalone.rs
+++ b/ballista/rust/scheduler/src/state/standalone.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{sync::Arc, task::Poll, time::Duration};
+use std::{sync::Arc, task::Poll};
 
 use crate::state::ConfigBackendClient;
 use ballista_core::error::{ballista_error, BallistaError, Result};
@@ -89,13 +89,7 @@ impl ConfigBackendClient for StandaloneClient {
             .map_err(|e| ballista_error(&format!("sled error {:?}", e)))?)
     }
 
-    // TODO: support lease_time. See 
https://github.com/spacejam/sled/issues/1119 for how to approach this
-    async fn put(
-        &self,
-        key: String,
-        value: Vec<u8>,
-        _lease_time: Option<Duration>,
-    ) -> Result<()> {
+    async fn put(&self, key: String, value: Vec<u8>) -> Result<()> {
         self.db
             .insert(key, value)
             .map_err(|e| {
@@ -170,7 +164,7 @@ mod tests {
         let client = create_instance()?;
         let key = "key";
         let value = "value".as_bytes();
-        client.put(key.to_owned(), value.to_vec(), None).await?;
+        client.put(key.to_owned(), value.to_vec()).await?;
         assert_eq!(client.get(key).await?, value);
         Ok(())
     }
@@ -189,12 +183,8 @@ mod tests {
         let client = create_instance()?;
         let key = "key";
         let value = "value".as_bytes();
-        client
-            .put(format!("{}/1", key), value.to_vec(), None)
-            .await?;
-        client
-            .put(format!("{}/2", key), value.to_vec(), None)
-            .await?;
+        client.put(format!("{}/1", key), value.to_vec()).await?;
+        client.put(format!("{}/2", key), value.to_vec()).await?;
         assert_eq!(
             client.get_from_prefix(key).await?,
             vec![
@@ -211,13 +201,13 @@ mod tests {
         let key = "key";
         let value = "value".as_bytes();
         let mut watch: Box<dyn Watch> = client.watch(key.to_owned()).await?;
-        client.put(key.to_owned(), value.to_vec(), None).await?;
+        client.put(key.to_owned(), value.to_vec()).await?;
         assert_eq!(
             watch.next().await,
             Some(WatchEvent::Put(key.to_owned(), value.to_owned()))
         );
         let value2 = "value2".as_bytes();
-        client.put(key.to_owned(), value2.to_vec(), None).await?;
+        client.put(key.to_owned(), value2.to_vec()).await?;
         assert_eq!(
             watch.next().await,
             Some(WatchEvent::Put(key.to_owned(), value2.to_owned()))

Reply via email to