This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 15526f9 Fix stuck issue for the load testing of Push-based task
scheduling (#2006)
15526f9 is described below
commit 15526f9e6ae7c68dbc5e91feca404181d359172f
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 16 13:48:03 2022 +0800
Fix stuck issue for the load testing of Push-based task scheduling (#2006)
* Fix concurrently updating ExecutorData
* Fix get_available_executors_data() to avoid losing event of
SchedulerServerEvent::JobSubmitted in offer_resources()
* Fix missing update JobStatus from Queued to Running
* Fix for PR review
Co-authored-by: yangzhong <[email protected]>
---
ballista/rust/core/src/serde/scheduler/mod.rs | 5 ++++
.../scheduler/src/scheduler_server/event_loop.rs | 33 +++++++++++++++++-----
.../rust/scheduler/src/scheduler_server/grpc.rs | 13 ++++++---
.../src/scheduler_server/query_stage_scheduler.rs | 19 +++++++++++--
.../rust/scheduler/src/state/in_memory_state.rs | 33 ++++++++++++++++++++--
ballista/rust/scheduler/src/state/mod.rs | 7 ++++-
6 files changed, 94 insertions(+), 16 deletions(-)
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs
b/ballista/rust/core/src/serde/scheduler/mod.rs
index c304382..369a87d 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -147,6 +147,11 @@ pub struct ExecutorData {
pub available_task_slots: u32,
}
+pub struct ExecutorDataChange {
+ pub executor_id: String,
+ pub task_slots: i32,
+}
+
struct ExecutorResourcePair {
total: protobuf::executor_resource::Resource,
available: protobuf::executor_resource::Resource,
diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
index 46d05f1..a7d656c 100644
--- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs
@@ -24,7 +24,7 @@ use log::{debug, warn};
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::EventAction;
use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
-use ballista_core::serde::scheduler::ExecutorData;
+use ballista_core::serde::scheduler::ExecutorDataChange;
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use crate::scheduler_server::ExecutorsClient;
@@ -70,12 +70,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id)));
}
+ let mut executors_data_change: Vec<ExecutorDataChange> =
available_executors
+ .iter()
+ .map(|executor_data| ExecutorDataChange {
+ executor_id: executor_data.executor_id.clone(),
+ task_slots: executor_data.available_task_slots as i32,
+ })
+ .collect();
+
let (tasks_assigment, num_tasks) = self
.state
.fetch_tasks(&mut available_executors, &job_id)
.await?;
+ for (data_change, data) in executors_data_change
+ .iter_mut()
+ .zip(available_executors.iter())
+ {
+ data_change.task_slots =
+ data.available_task_slots as i32 - data_change.task_slots;
+ }
+
if num_tasks > 0 {
- self.launch_tasks(&available_executors, tasks_assigment)
+ self.launch_tasks(&executors_data_change, tasks_assigment)
.await?;
}
@@ -84,12 +100,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
async fn launch_tasks(
&self,
- executors: &[ExecutorData],
+ executors: &[ExecutorDataChange],
tasks_assigment: Vec<Vec<TaskDefinition>>,
) -> Result<()> {
for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() {
if !tasks.is_empty() {
- let executor_data = &executors[idx_executor];
+ let executor_data_change = &executors[idx_executor];
debug!(
"Start to launch tasks {:?} to executor {:?}",
tasks
@@ -107,14 +123,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
}
})
.collect::<Vec<String>>(),
- executor_data.executor_id
+ executor_data_change.executor_id
);
let mut client = {
let clients = self.executors_client.read().await;
- clients.get(&executor_data.executor_id).unwrap().clone()
+ clients
+ .get(&executor_data_change.executor_id)
+ .unwrap()
+ .clone()
};
// Update the resources first
- self.state.save_executor_data(executor_data.clone());
+ self.state.update_executor_data(executor_data_change);
// TODO check whether launching task is successful or not
client.launch_task(LaunchTaskParams { task: tasks }).await?;
} else {
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index fe4eb36..6b96d41 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -30,7 +30,9 @@ use ballista_core::serde::protobuf::{
TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
-use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
+use ballista_core::serde::scheduler::{
+ ExecutorData, ExecutorDataChange, ExecutorMetadata,
+};
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
@@ -290,9 +292,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
jobs.insert(task_id.job_id.clone());
}
}
- if let Some(mut executor_data) =
self.state.get_executor_data(&executor_id) {
- executor_data.available_task_slots += num_tasks as u32;
- self.state.save_executor_data(executor_data);
+
+ if let Some(executor_data) =
self.state.get_executor_data(&executor_id) {
+ self.state.update_executor_data(&ExecutorDataChange {
+ executor_id: executor_data.executor_id,
+ task_slots: num_tasks as i32,
+ });
} else {
error!("Fail to get executor data for {:?}", &executor_id);
}
diff --git
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 5a2800c..31a0f9d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -19,12 +19,14 @@ use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
use tokio::sync::RwLock;
use ballista_core::error::{BallistaError, Result};
use ballista_core::event_loop::{EventAction, EventSender};
-use ballista_core::serde::protobuf::{PartitionId, TaskStatus};
+use ballista_core::serde::protobuf::{
+ job_status, JobStatus, PartitionId, RunningJob, TaskStatus,
+};
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
@@ -160,7 +162,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
) -> Result<Option<QueryStageSchedulerEvent>> {
match event {
QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => {
+ info!("Job {} submitted", job_id);
let plan = self.create_physical_plan(plan).await?;
+ if let Err(e) = self
+ .state
+ .save_job_metadata(
+ &job_id,
+ &JobStatus {
+ status:
Some(job_status::Status::Running(RunningJob {})),
+ },
+ )
+ .await
+ {
+ warn!("Could not update job {} status to running: {}",
job_id, e);
+ }
self.generate_stages(&job_id, plan).await?;
if let Some(event_sender) = self.event_sender.as_ref() {
diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs
b/ballista/rust/scheduler/src/state/in_memory_state.rs
index 4eac923..666d229 100644
--- a/ballista/rust/scheduler/src/state/in_memory_state.rs
+++ b/ballista/rust/scheduler/src/state/in_memory_state.rs
@@ -16,7 +16,8 @@
// under the License.
use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus};
-use ballista_core::serde::scheduler::ExecutorData;
+use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange};
+use log::{error, info, warn};
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -85,6 +86,33 @@ impl InMemorySchedulerState {
executors_data.insert(executor_data.executor_id.clone(),
executor_data);
}
+ pub(crate) fn update_executor_data(&self, executor_data_change:
&ExecutorDataChange) {
+ let mut executors_data = self.executors_data.write();
+ if let Some(executor_data) =
+ executors_data.get_mut(&executor_data_change.executor_id)
+ {
+ let available_task_slots = executor_data.available_task_slots as
i32
+ + executor_data_change.task_slots;
+ if available_task_slots < 0 {
+ error!(
+ "Available task slots {} for executor {} is less than 0",
+ available_task_slots, executor_data.executor_id
+ );
+ } else {
+ info!(
+ "available_task_slots for executor {} becomes {}",
+ executor_data.executor_id, available_task_slots
+ );
+ executor_data.available_task_slots = available_task_slots as
u32;
+ }
+ } else {
+ warn!(
+ "Could not find executor data for {}",
+ executor_data_change.executor_id
+ );
+ }
+ }
+
pub(crate) fn get_executor_data(&self, executor_id: &str) ->
Option<ExecutorData> {
let executors_data = self.executors_data.read();
executors_data.get(executor_id).cloned()
@@ -100,7 +128,8 @@ impl InMemorySchedulerState {
executors_data
.iter()
.filter_map(|(exec, data)| {
- alive_executors.contains(exec).then(|| data.clone())
+ (data.available_task_slots > 0 &&
alive_executors.contains(exec))
+ .then(|| data.clone())
})
.collect::<Vec<ExecutorData>>()
};
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index e015e5a..cc7252f 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{
FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus,
};
use ballista_core::serde::scheduler::{
- ExecutorData, ExecutorMetadata, PartitionId, PartitionStats,
+ ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId,
PartitionStats,
};
use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan,
BallistaCodec};
use datafusion::prelude::ExecutionContext;
@@ -227,6 +227,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
self.in_memory_state.save_executor_data(executor_data);
}
+ pub fn update_executor_data(&self, executor_data_change:
&ExecutorDataChange) {
+ self.in_memory_state
+ .update_executor_data(executor_data_change);
+ }
+
pub fn get_available_executors_data(&self) -> Vec<ExecutorData> {
self.in_memory_state.get_available_executors_data()
}