This is an automated email from the ASF dual-hosted git repository.

yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b4a1219 [fea] Avoid multithreaded write lock conflicts in event queue 
(#754)
9b4a1219 is described below

commit 9b4a12191aac37bda6d0c5e933377698687872b9
Author: Yang Jiang <[email protected]>
AuthorDate: Mon May 8 18:28:41 2023 +0800

    [fea] Avoid multithreaded write lock conflicts in event queue (#754)
---
 ballista/scheduler/src/scheduler_server/event.rs   | 10 ++-
 ballista/scheduler/src/scheduler_server/mod.rs     | 13 +++-
 .../src/scheduler_server/query_stage_scheduler.rs  | 84 +++++++++++++++-------
 ballista/scheduler/src/state/mod.rs                | 13 ++--
 4 files changed, 81 insertions(+), 39 deletions(-)

diff --git a/ballista/scheduler/src/scheduler_server/event.rs 
b/ballista/scheduler/src/scheduler_server/event.rs
index 2f428665..ed828ec0 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -22,6 +22,7 @@ use datafusion::logical_expr::LogicalPlan;
 
 use crate::state::execution_graph::RunningTaskInfo;
 use ballista_core::serde::protobuf::TaskStatus;
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use std::sync::Arc;
 
@@ -36,9 +37,12 @@ pub enum QueryStageSchedulerEvent {
     },
     JobSubmitted {
         job_id: String,
+        job_name: String,
+        session_id: String,
         queued_at: u64,
         submitted_at: u64,
         resubmit: bool,
+        plan: Arc<dyn ExecutionPlan>,
     },
     // For a job which failed during planning
     JobPlanningFailed {
@@ -76,8 +80,10 @@ impl Debug for QueryStageSchedulerEvent {
             } => {
                 write!(f, "JobQueued : job_id={job_id}, job_name={job_name}.")
             }
-            QueryStageSchedulerEvent::JobSubmitted { job_id, .. } => {
-                write!(f, "JobSubmitted : job_id={job_id}.")
+            QueryStageSchedulerEvent::JobSubmitted {
+                job_id, resubmit, ..
+            } => {
+                write!(f, "JobSubmitted : job_id={job_id}, 
resubmit={resubmit}.")
             }
             QueryStageSchedulerEvent::JobPlanningFailed {
                 job_id,
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index 79e453cb..bb0e9b85 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -440,13 +440,20 @@ mod test {
             .queue_job(job_id, "", timestamp_millis())
             .await?;
 
-        // Submit job
-        scheduler
+        // Plan job
+        let plan = scheduler
             .state
-            .submit_job(job_id, "", ctx, &plan, 0)
+            .plan_job(job_id, ctx.clone(), &plan)
             .await
             .expect("submitting plan");
 
+        //Submit job plan
+        scheduler
+            .state
+            .task_manager
+            .submit_job(job_id, "", &ctx.session_id(), plan, 0)
+            .await?;
+
         // Refresh the ExecutionGraph
         while let Some(graph) = scheduler
             .state
diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs 
b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
index bacde478..eb6f7504 100644
--- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -119,26 +119,29 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
 
                 let state = self.state.clone();
                 tokio::spawn(async move {
-                    let event = if let Err(e) = state
-                        .submit_job(&job_id, &job_name, session_ctx, &plan, 
queued_at)
-                        .await
-                    {
-                        let fail_message = format!("Error planning job 
{job_id}: {e:?}");
-                        error!("{}", &fail_message);
-                        QueryStageSchedulerEvent::JobPlanningFailed {
-                            job_id,
-                            fail_message,
-                            queued_at,
-                            failed_at: timestamp_millis(),
-                        }
-                    } else {
-                        QueryStageSchedulerEvent::JobSubmitted {
-                            job_id,
-                            queued_at,
-                            submitted_at: timestamp_millis(),
-                            resubmit: false,
-                        }
-                    };
+                    let event =
+                        match state.plan_job(&job_id, session_ctx.clone(), 
&plan).await {
+                            Ok(plan) => QueryStageSchedulerEvent::JobSubmitted 
{
+                                job_id,
+                                job_name,
+                                session_id: session_ctx.session_id(),
+                                queued_at,
+                                submitted_at: timestamp_millis(),
+                                resubmit: false,
+                                plan,
+                            },
+                            Err(error) => {
+                                let fail_message =
+                                    format!("Error planning job {job_id}: 
{error:?}");
+                                error!("{}", &fail_message);
+                                QueryStageSchedulerEvent::JobPlanningFailed {
+                                    job_id,
+                                    fail_message,
+                                    queued_at,
+                                    failed_at: timestamp_millis(),
+                                }
+                            }
+                        };
                     if let Err(e) = tx_event.post_event(event).await {
                         error!("Fail to send event due to {}", e);
                     }
@@ -146,9 +149,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
             }
             QueryStageSchedulerEvent::JobSubmitted {
                 job_id,
+                job_name,
+                session_id,
                 queued_at,
                 submitted_at,
                 resubmit,
+                plan,
             } => {
                 if !resubmit {
                     self.metrics_collector.record_submitted(
@@ -156,7 +162,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                         queued_at,
                         submitted_at,
                     );
-
+                    self.state
+                        .task_manager
+                        .submit_job(
+                            job_id.as_str(),
+                            job_name.as_str(),
+                            session_id.as_str(),
+                            plan.clone(),
+                            queued_at,
+                        )
+                        .await?;
                     info!("Job {} submitted", job_id);
                 } else {
                     debug!("Job {} resubmitted", job_id);
@@ -192,9 +207,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan>
                             if let Err(e) = tx_event
                                 
.post_event(QueryStageSchedulerEvent::JobSubmitted {
                                     job_id,
+                                    job_name,
+                                    session_id,
                                     queued_at,
                                     submitted_at,
                                     resubmit: true,
+                                    plan: plan.clone(),
                                 })
                                 .await
                             {
@@ -387,6 +405,7 @@ mod tests {
     use ballista_core::event_loop::EventAction;
     use datafusion::arrow::datatypes::{DataType, Field, Schema};
     use datafusion::logical_expr::{col, sum, LogicalPlan};
+    use datafusion::physical_plan::empty::EmptyExec;
     use datafusion::test_util::scan_empty_with_partitions;
     use std::sync::Arc;
     use std::time::Duration;
@@ -418,15 +437,26 @@ mod tests {
 
         let event = QueryStageSchedulerEvent::JobSubmitted {
             job_id: "job-id".to_string(),
+            job_name: "job-name".to_string(),
+            session_id: "session-id".to_string(),
             queued_at: 0,
             submitted_at: 0,
             resubmit: false,
+            plan: Arc::new(EmptyExec::new(false, Arc::new(test_schema()))),
         };
 
+        // Mock the JobQueued work.
+        query_stage_scheduler
+            .state
+            .task_manager
+            .queue_job("job-id", "job-name", 0)
+            .await?;
+
         query_stage_scheduler.on_receive(event, &tx, &rx).await?;
 
         let next_event = rx.recv().await.unwrap();
 
+        dbg!(next_event.clone());
         assert!(matches!(
             next_event,
             QueryStageSchedulerEvent::JobSubmitted { job_id, resubmit, .. } if 
job_id == "job-id" && resubmit
@@ -540,10 +570,7 @@ mod tests {
     }
 
     fn test_plan(partitions: usize) -> LogicalPlan {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Utf8, false),
-            Field::new("gmv", DataType::UInt64, false),
-        ]);
+        let schema = test_schema();
 
         scan_empty_with_partitions(None, &schema, Some(vec![0, 1]), partitions)
             .unwrap()
@@ -552,4 +579,11 @@ mod tests {
             .build()
             .unwrap()
     }
+
+    fn test_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("id", DataType::Utf8, false),
+            Field::new("gmv", DataType::UInt64, false),
+        ])
+    }
 }
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 483828cc..03f53b76 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -38,6 +38,7 @@ use ballista_core::serde::protobuf::TaskStatus;
 use ballista_core::serde::BallistaCodec;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::display::DisplayableExecutionPlan;
+use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::logical_plan::AsLogicalPlan;
 use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -311,14 +312,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         executor_stage_assignments
     }
 
-    pub(crate) async fn submit_job(
+    pub(crate) async fn plan_job(
         &self,
         job_id: &str,
-        job_name: &str,
         session_ctx: Arc<SessionContext>,
         plan: &LogicalPlan,
-        queued_at: u64,
-    ) -> Result<()> {
+    ) -> Result<Arc<dyn ExecutionPlan>> {
         let start = Instant::now();
 
         if log::max_level() >= log::Level::Debug {
@@ -373,15 +372,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             DisplayableExecutionPlan::new(plan.as_ref()).indent()
         );
 
-        self.task_manager
-            .submit_job(job_id, job_name, &session_ctx.session_id(), plan, 
queued_at)
-            .await?;
-
         let elapsed = start.elapsed();
 
         info!("Planned job {} in {:?}", job_id, elapsed);
 
-        Ok(())
+        Ok(plan)
     }
 
     /// Spawn a delayed future to clean up job data on both Scheduler and 
Executors

Reply via email to