This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 15526f9  Fix stuck issue for the load testing of Push-based task 
scheduling (#2006)
15526f9 is described below

commit 15526f9e6ae7c68dbc5e91feca404181d359172f
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 16 13:48:03 2022 +0800

    Fix stuck issue for the load testing of Push-based task scheduling (#2006)
    
    * Fix concurrently updating ExecutorData
    
    * Fix get_available_executors_data() to avoid losing event of 
SchedulerServerEvent::JobSubmitted in offer_resources()
    
    * Fix missing update JobStatus from Queued to Running
    
    * Fix for PR review
    
    Co-authored-by: yangzhong <[email protected]>
---
 ballista/rust/core/src/serde/scheduler/mod.rs      |  5 ++++
 .../scheduler/src/scheduler_server/event_loop.rs   | 33 +++++++++++++++++-----
 .../rust/scheduler/src/scheduler_server/grpc.rs    | 13 ++++++---
 .../src/scheduler_server/query_stage_scheduler.rs  | 19 +++++++++++--
 .../rust/scheduler/src/state/in_memory_state.rs    | 33 ++++++++++++++++++++--
 ballista/rust/scheduler/src/state/mod.rs           |  7 ++++-
 6 files changed, 94 insertions(+), 16 deletions(-)

diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs 
b/ballista/rust/core/src/serde/scheduler/mod.rs
index c304382..369a87d 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -147,6 +147,11 @@ pub struct ExecutorData {
     pub available_task_slots: u32,
 }
 
+pub struct ExecutorDataChange {
+    pub executor_id: String,
+    pub task_slots: i32,
+}
+
 struct ExecutorResourcePair {
     total: protobuf::executor_resource::Resource,
     available: protobuf::executor_resource::Resource,
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs 
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index 46d05f1..a7d656c 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -24,7 +24,7 @@ use log::{debug, warn};
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::EventAction;
 use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
-use ballista_core::serde::scheduler::ExecutorData;
+use ballista_core::serde::scheduler::ExecutorDataChange;
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
 
 use crate::scheduler_server::ExecutorsClient;
@@ -70,12 +70,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
             return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id)));
         }
 
+        let mut executors_data_change: Vec<ExecutorDataChange> = 
available_executors
+            .iter()
+            .map(|executor_data| ExecutorDataChange {
+                executor_id: executor_data.executor_id.clone(),
+                task_slots: executor_data.available_task_slots as i32,
+            })
+            .collect();
+
         let (tasks_assigment, num_tasks) = self
             .state
             .fetch_tasks(&mut available_executors, &job_id)
             .await?;
+        for (data_change, data) in executors_data_change
+            .iter_mut()
+            .zip(available_executors.iter())
+        {
+            data_change.task_slots =
+                data.available_task_slots as i32 - data_change.task_slots;
+        }
+
         if num_tasks > 0 {
-            self.launch_tasks(&available_executors, tasks_assigment)
+            self.launch_tasks(&executors_data_change, tasks_assigment)
                 .await?;
         }
 
@@ -84,12 +100,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
 
     async fn launch_tasks(
         &self,
-        executors: &[ExecutorData],
+        executors: &[ExecutorDataChange],
         tasks_assigment: Vec<Vec<TaskDefinition>>,
     ) -> Result<()> {
         for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() {
             if !tasks.is_empty() {
-                let executor_data = &executors[idx_executor];
+                let executor_data_change = &executors[idx_executor];
                 debug!(
                     "Start to launch tasks {:?} to executor {:?}",
                     tasks
@@ -107,14 +123,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                             }
                         })
                         .collect::<Vec<String>>(),
-                    executor_data.executor_id
+                    executor_data_change.executor_id
                 );
                 let mut client = {
                     let clients = self.executors_client.read().await;
-                    clients.get(&executor_data.executor_id).unwrap().clone()
+                    clients
+                        .get(&executor_data_change.executor_id)
+                        .unwrap()
+                        .clone()
                 };
                 // Update the resources first
-                self.state.save_executor_data(executor_data.clone());
+                self.state.update_executor_data(executor_data_change);
                 // TODO check whether launching task is successful or not
                 client.launch_task(LaunchTaskParams { task: tasks }).await?;
             } else {
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs 
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index fe4eb36..6b96d41 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -30,7 +30,9 @@ use ballista_core::serde::protobuf::{
     TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult,
 };
 use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use ballista_core::serde::scheduler::{
+    ExecutorData, ExecutorDataChange, ExecutorMetadata,
+};
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
@@ -290,9 +292,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
                     jobs.insert(task_id.job_id.clone());
                 }
             }
-            if let Some(mut executor_data) = 
self.state.get_executor_data(&executor_id) {
-                executor_data.available_task_slots += num_tasks as u32;
-                self.state.save_executor_data(executor_data);
+
+            if let Some(executor_data) = 
self.state.get_executor_data(&executor_id) {
+                self.state.update_executor_data(&ExecutorDataChange {
+                    executor_id: executor_data.executor_id,
+                    task_slots: num_tasks as i32,
+                });
             } else {
                 error!("Fail to get executor data for {:?}", &executor_id);
             }
diff --git 
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 5a2800c..31a0f9d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -19,12 +19,14 @@ use std::sync::Arc;
 use std::time::Instant;
 
 use async_trait::async_trait;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
 use tokio::sync::RwLock;
 
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::event_loop::{EventAction, EventSender};
-use ballista_core::serde::protobuf::{PartitionId, TaskStatus};
+use ballista_core::serde::protobuf::{
+    job_status, JobStatus, PartitionId, RunningJob, TaskStatus,
+};
 use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
 use datafusion::logical_plan::LogicalPlan;
 use datafusion::physical_plan::ExecutionPlan;
@@ -160,7 +162,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
     ) -> Result<Option<QueryStageSchedulerEvent>> {
         match event {
             QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => {
+                info!("Job {} submitted", job_id);
                 let plan = self.create_physical_plan(plan).await?;
+                if let Err(e) = self
+                    .state
+                    .save_job_metadata(
+                        &job_id,
+                        &JobStatus {
+                            status: 
Some(job_status::Status::Running(RunningJob {})),
+                        },
+                    )
+                    .await
+                {
+                    warn!("Could not update job {} status to running: {}", 
job_id, e);
+                }
                 self.generate_stages(&job_id, plan).await?;
 
                 if let Some(event_sender) = self.event_sender.as_ref() {
diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs 
b/ballista/rust/scheduler/src/state/in_memory_state.rs
index 4eac923..666d229 100644
--- a/ballista/rust/scheduler/src/state/in_memory_state.rs
+++ b/ballista/rust/scheduler/src/state/in_memory_state.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus};
-use ballista_core::serde::scheduler::ExecutorData;
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange};
+use log::{error, info, warn};
 use parking_lot::RwLock;
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
@@ -85,6 +86,33 @@ impl InMemorySchedulerState {
         executors_data.insert(executor_data.executor_id.clone(), 
executor_data);
     }
 
+    pub(crate) fn update_executor_data(&self, executor_data_change: 
&ExecutorDataChange) {
+        let mut executors_data = self.executors_data.write();
+        if let Some(executor_data) =
+            executors_data.get_mut(&executor_data_change.executor_id)
+        {
+            let available_task_slots = executor_data.available_task_slots as 
i32
+                + executor_data_change.task_slots;
+            if available_task_slots < 0 {
+                error!(
+                    "Available task slots {} for executor {} is less than 0",
+                    available_task_slots, executor_data.executor_id
+                );
+            } else {
+                info!(
+                    "available_task_slots for executor {} becomes {}",
+                    executor_data.executor_id, available_task_slots
+                );
+                executor_data.available_task_slots = available_task_slots as 
u32;
+            }
+        } else {
+            warn!(
+                "Could not find executor data for {}",
+                executor_data_change.executor_id
+            );
+        }
+    }
+
     pub(crate) fn get_executor_data(&self, executor_id: &str) -> 
Option<ExecutorData> {
         let executors_data = self.executors_data.read();
         executors_data.get(executor_id).cloned()
@@ -100,7 +128,8 @@ impl InMemorySchedulerState {
             executors_data
                 .iter()
                 .filter_map(|(exec, data)| {
-                    alive_executors.contains(exec).then(|| data.clone())
+                    (data.available_task_slots > 0 && 
alive_executors.contains(exec))
+                        .then(|| data.clone())
                 })
                 .collect::<Vec<ExecutorData>>()
         };
diff --git a/ballista/rust/scheduler/src/state/mod.rs 
b/ballista/rust/scheduler/src/state/mod.rs
index e015e5a..cc7252f 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{
     FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus,
 };
 use ballista_core::serde::scheduler::{
-    ExecutorData, ExecutorMetadata, PartitionId, PartitionStats,
+    ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId, 
PartitionStats,
 };
 use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, 
BallistaCodec};
 use datafusion::prelude::ExecutionContext;
@@ -227,6 +227,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         self.in_memory_state.save_executor_data(executor_data);
     }
 
+    pub fn update_executor_data(&self, executor_data_change: 
&ExecutorDataChange) {
+        self.in_memory_state
+            .update_executor_data(executor_data_change);
+    }
+
     pub fn get_available_executors_data(&self) -> Vec<ExecutorData> {
         self.in_memory_state.get_available_executors_data()
     }

Reply via email to