This is an automated email from the ASF dual-hosted git repository.
yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 9b4a1219 [fea] Avoid multithreaded write lock conflicts in event queue
(#754)
9b4a1219 is described below
commit 9b4a12191aac37bda6d0c5e933377698687872b9
Author: Yang Jiang <[email protected]>
AuthorDate: Mon May 8 18:28:41 2023 +0800
[fea] Avoid multithreaded write lock conflicts in event queue (#754)
---
ballista/scheduler/src/scheduler_server/event.rs | 10 ++-
ballista/scheduler/src/scheduler_server/mod.rs | 13 +++-
.../src/scheduler_server/query_stage_scheduler.rs | 84 +++++++++++++++-------
ballista/scheduler/src/state/mod.rs | 13 ++--
4 files changed, 81 insertions(+), 39 deletions(-)
diff --git a/ballista/scheduler/src/scheduler_server/event.rs
b/ballista/scheduler/src/scheduler_server/event.rs
index 2f428665..ed828ec0 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -22,6 +22,7 @@ use datafusion::logical_expr::LogicalPlan;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::TaskStatus;
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use std::sync::Arc;
@@ -36,9 +37,12 @@ pub enum QueryStageSchedulerEvent {
},
JobSubmitted {
job_id: String,
+ job_name: String,
+ session_id: String,
queued_at: u64,
submitted_at: u64,
resubmit: bool,
+ plan: Arc<dyn ExecutionPlan>,
},
// For a job which failed during planning
JobPlanningFailed {
@@ -76,8 +80,10 @@ impl Debug for QueryStageSchedulerEvent {
} => {
write!(f, "JobQueued : job_id={job_id}, job_name={job_name}.")
}
- QueryStageSchedulerEvent::JobSubmitted { job_id, .. } => {
- write!(f, "JobSubmitted : job_id={job_id}.")
+ QueryStageSchedulerEvent::JobSubmitted {
+ job_id, resubmit, ..
+ } => {
+ write!(f, "JobSubmitted : job_id={job_id},
resubmit={resubmit}.")
}
QueryStageSchedulerEvent::JobPlanningFailed {
job_id,
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 79e453cb..bb0e9b85 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -440,13 +440,20 @@ mod test {
.queue_job(job_id, "", timestamp_millis())
.await?;
- // Submit job
- scheduler
+ // Plan job
+ let plan = scheduler
.state
- .submit_job(job_id, "", ctx, &plan, 0)
+ .plan_job(job_id, ctx.clone(), &plan)
.await
.expect("submitting plan");
+ //Submit job plan
+ scheduler
+ .state
+ .task_manager
+ .submit_job(job_id, "", &ctx.session_id(), plan, 0)
+ .await?;
+
// Refresh the ExecutionGraph
while let Some(graph) = scheduler
.state
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index bacde478..eb6f7504 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -119,26 +119,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
let state = self.state.clone();
tokio::spawn(async move {
- let event = if let Err(e) = state
- .submit_job(&job_id, &job_name, session_ctx, &plan,
queued_at)
- .await
- {
- let fail_message = format!("Error planning job
{job_id}: {e:?}");
- error!("{}", &fail_message);
- QueryStageSchedulerEvent::JobPlanningFailed {
- job_id,
- fail_message,
- queued_at,
- failed_at: timestamp_millis(),
- }
- } else {
- QueryStageSchedulerEvent::JobSubmitted {
- job_id,
- queued_at,
- submitted_at: timestamp_millis(),
- resubmit: false,
- }
- };
+ let event =
+ match state.plan_job(&job_id, session_ctx.clone(),
&plan).await {
+ Ok(plan) => QueryStageSchedulerEvent::JobSubmitted
{
+ job_id,
+ job_name,
+ session_id: session_ctx.session_id(),
+ queued_at,
+ submitted_at: timestamp_millis(),
+ resubmit: false,
+ plan,
+ },
+ Err(error) => {
+ let fail_message =
+ format!("Error planning job {job_id}:
{error:?}");
+ error!("{}", &fail_message);
+ QueryStageSchedulerEvent::JobPlanningFailed {
+ job_id,
+ fail_message,
+ queued_at,
+ failed_at: timestamp_millis(),
+ }
+ }
+ };
if let Err(e) = tx_event.post_event(event).await {
error!("Fail to send event due to {}", e);
}
@@ -146,9 +149,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
}
QueryStageSchedulerEvent::JobSubmitted {
job_id,
+ job_name,
+ session_id,
queued_at,
submitted_at,
resubmit,
+ plan,
} => {
if !resubmit {
self.metrics_collector.record_submitted(
@@ -156,7 +162,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
queued_at,
submitted_at,
);
-
+ self.state
+ .task_manager
+ .submit_job(
+ job_id.as_str(),
+ job_name.as_str(),
+ session_id.as_str(),
+ plan.clone(),
+ queued_at,
+ )
+ .await?;
info!("Job {} submitted", job_id);
} else {
debug!("Job {} resubmitted", job_id);
@@ -192,9 +207,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan>
if let Err(e) = tx_event
.post_event(QueryStageSchedulerEvent::JobSubmitted {
job_id,
+ job_name,
+ session_id,
queued_at,
submitted_at,
resubmit: true,
+ plan: plan.clone(),
})
.await
{
@@ -387,6 +405,7 @@ mod tests {
use ballista_core::event_loop::EventAction;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
+ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
@@ -418,15 +437,26 @@ mod tests {
let event = QueryStageSchedulerEvent::JobSubmitted {
job_id: "job-id".to_string(),
+ job_name: "job-name".to_string(),
+ session_id: "session-id".to_string(),
queued_at: 0,
submitted_at: 0,
resubmit: false,
+ plan: Arc::new(EmptyExec::new(false, Arc::new(test_schema()))),
};
+ // Mock the JobQueued work.
+ query_stage_scheduler
+ .state
+ .task_manager
+ .queue_job("job-id", "job-name", 0)
+ .await?;
+
query_stage_scheduler.on_receive(event, &tx, &rx).await?;
let next_event = rx.recv().await.unwrap();
+ dbg!(next_event.clone());
assert!(matches!(
next_event,
QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if
job_id == "job-id" && resubmit
@@ -540,10 +570,7 @@ mod tests {
}
fn test_plan(partitions: usize) -> LogicalPlan {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Utf8, false),
- Field::new("gmv", DataType::UInt64, false),
- ]);
+ let schema = test_schema();
scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
.unwrap()
@@ -552,4 +579,11 @@ mod tests {
.build()
.unwrap()
}
+
+ fn test_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("id", DataType::Utf8, false),
+ Field::new("gmv", DataType::UInt64, false),
+ ])
+ }
}
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 483828cc..03f53b76 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -38,6 +38,7 @@ use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::BallistaCodec;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -311,14 +312,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
executor_stage_assignments
}
- pub(crate) async fn submit_job(
+ pub(crate) async fn plan_job(
&self,
job_id: &str,
- job_name: &str,
session_ctx: Arc<SessionContext>,
plan: &LogicalPlan,
- queued_at: u64,
- ) -> Result<()> {
+ ) -> Result<Arc<dyn ExecutionPlan>> {
let start = Instant::now();
if log::max_level() >= log::Level::Debug {
@@ -373,15 +372,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
DisplayableExecutionPlan::new(plan.as_ref()).indent()
);
- self.task_manager
- .submit_job(job_id, job_name, &session_ctx.session_id(), plan,
queued_at)
- .await?;
-
let elapsed = start.elapsed();
info!("Planned job {} in {:?}", job_id, elapsed);
- Ok(())
+ Ok(plan)
}
/// Spawn a delayed future to clean up job data on both Scheduler and
Executors