Ted-Jiang commented on a change in pull request #1810:
URL: https://github.com/apache/arrow-datafusion/pull/1810#discussion_r807581029



##########
File path: ballista/rust/scheduler/src/lib.rs
##########
@@ -606,120 +589,65 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
         }
     }
 
-    async fn send_heart_beat(
+    async fn heart_beat_from_executor(
         &self,
-        request: Request<SendHeartBeatParams>,
-    ) -> Result<Response<SendHeartBeatResult>, Status> {
-        let remote_addr = request.remote_addr();
-        if let SendHeartBeatParams {
-            metadata: Some(metadata),
-            state: Some(state),
-        } = request.into_inner()
-        {
-            debug!("Received heart beat request for {:?}", metadata);
-            trace!("Related executor state is {:?}", state);
-            let metadata: ExecutorMeta = ExecutorMeta {
-                id: metadata.id,
-                host: metadata
-                    .optional_host
-                    .map(|h| match h {
-                        OptionalHost::Host(host) => host,
-                    })
-                    .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
-                port: metadata.port as u16,
-                grpc_port: metadata.grpc_port as u16,
-            };
-            {
-                let mut lock = self.state.lock().await.map_err(|e| {
-                    let msg = format!("Could not lock the state: {}", e);
-                    error!("{}", msg);
-                    tonic::Status::internal(msg)
-                })?;
-                self.state
-                    .save_executor_state(metadata, Some(state))
-                    .await
-                    .map_err(|e| {
-                        let msg = format!("Could not save executor metadata: 
{}", e);
-                        error!("{}", msg);
-                        tonic::Status::internal(msg)
-                    })?;
-                lock.unlock().await;
-            }
-            Ok(Response::new(SendHeartBeatResult { reregister: false }))
-        } else {
-            warn!("Received invalid executor heart beat request");
-            Err(tonic::Status::invalid_argument(
-                "Missing metadata or metrics in request",
-            ))
-        }
+        request: Request<HeartBeatParams>,
+    ) -> Result<Response<HeartBeatResult>, Status> {
+        let HeartBeatParams { executor_id, state } = request.into_inner();
+
+        debug!("Received heart beat request for {:?}", executor_id);
+        trace!("Related executor state is {:?}", state);
+        let executor_heartbeat = ExecutorHeartbeat {
+            executor_id,
+            timestamp: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .expect("Time went backwards")
+                .as_secs(),
+            state,
+        };
+        self.state.save_executor_heartbeat(executor_heartbeat);
+        Ok(Response::new(HeartBeatResult { reregister: false }))
     }
 
     async fn update_task_status(
         &self,
         request: Request<UpdateTaskStatusParams>,
     ) -> Result<Response<UpdateTaskStatusResult>, Status> {
-        if let UpdateTaskStatusParams {
-            metadata: Some(metadata),
+        let UpdateTaskStatusParams {
+            executor_id,
             task_status,
-        } = request.into_inner()
+        } = request.into_inner();
+
+        debug!(
+            "Received task status update request for executor {:?}",
+            executor_id
+        );
+        trace!("Related task status is {:?}", task_status);
+        let mut jobs = HashSet::new();
         {
-            debug!("Received task status update request for {:?}", metadata);
-            trace!("Related task status is {:?}", task_status);
-            let mut jobs = HashSet::new();
-            {
-                let mut lock = self.state.lock().await.map_err(|e| {
-                    let msg = format!("Could not lock the state: {}", e);
-                    error!("{}", msg);
-                    tonic::Status::internal(msg)
-                })?;
-                let num_tasks = task_status.len();
-                for task_status in task_status {
-                    self.state
-                        .save_task_status(&task_status)
-                        .await
-                        .map_err(|e| {
-                            let msg = format!("Could not save task status: 
{}", e);
-                            error!("{}", msg);
-                            tonic::Status::internal(msg)
-                        })?;
-                    if task_status.partition_id.is_some() {
-                        
jobs.insert(task_status.partition_id.unwrap().job_id.clone());
-                    }
-                }
-                let mut executor_data = self
-                    .state
-                    .get_executor_data(&metadata.id)
-                    .await
-                    .map_err(|e| {
-                        let msg = format!(
-                            "Could not get metadata data for id {:?}: {}",
-                            &metadata.id, e
-                        );
-                        error!("{}", msg);
-                        tonic::Status::internal(msg)
-                    })?;
-                executor_data.available_task_slots += num_tasks as u32;
+            let num_tasks = task_status.len();
+            for task_status in task_status {
                 self.state
-                    .save_executor_data(executor_data)
+                    .save_task_status(&task_status)
                     .await
                     .map_err(|e| {
-                        let msg = format!("Could not save metadata data: {}", 
e);
+                        let msg = format!("Could not save task status: {}", e);
                         error!("{}", msg);
                         tonic::Status::internal(msg)
                     })?;
-                lock.unlock().await;
-            }
-            let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
-            for job_id in jobs {
-                tx_job.send(job_id).await.unwrap();
+                if task_status.task_id.is_some() {
+                    jobs.insert(task_status.task_id.unwrap().job_id.clone());
+                }
             }
-            Ok(Response::new(UpdateTaskStatusResult { success: true }))
-        } else {
-            warn!("Received invalid task status update request");
-            Err(tonic::Status::invalid_argument(
-                "Missing metadata or task status in request",
-            ))
+            let mut executor_data = 
self.state.get_executor_data(&executor_id).unwrap();
+            executor_data.available_task_slots += num_tasks as u32;
+            self.state.save_executor_data(executor_data);
+        }
+        let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone();
+        for job_id in jobs {
+            tx_job.send(job_id).await.unwrap();

Review comment:
       ```suggestion
               tx_job.send(job_id).await?;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to