This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 2625c73c Replace Mutex<HashMap> by using DashMap (#449)
2625c73c is described below

commit 2625c73c57efcc26804218586d9e5d8000ea1ff1
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Oct 26 21:11:42 2022 +0800

    Replace Mutex<HashMap> by using DashMap (#449)
---
 ballista/executor/src/executor.rs    | 17 +++++-------
 ballista/scheduler/src/flight_sql.rs | 52 +++++++++++-------------------------
 2 files changed, 21 insertions(+), 48 deletions(-)

diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 47f5fe33..1c25baa3 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -17,6 +17,7 @@
 
 //! Ballista executor logic
 
+use dashmap::DashMap;
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -35,9 +36,8 @@ use datafusion::physical_plan::{ExecutionPlan, Partitioning};
 use futures::future::AbortHandle;
 
 use ballista_core::serde::scheduler::PartitionId;
-use tokio::sync::Mutex;
 
-type AbortHandles = Arc<Mutex<HashMap<(usize, PartitionId), AbortHandle>>>;
+type AbortHandles = Arc<DashMap<(usize, PartitionId), AbortHandle>>;
 
 /// Ballista executor
 #[derive(Clone)]
@@ -106,17 +106,12 @@ impl Executor {
             shuffle_writer.execute_shuffle_write(partition.partition_id, 
task_ctx),
         );
 
-        {
-            let mut abort_handles = self.abort_handles.lock().await;
-            abort_handles.insert((task_id, partition.clone()), abort_handle);
-        }
+        self.abort_handles
+            .insert((task_id, partition.clone()), abort_handle);
 
         let partitions = task.await??;
 
-        self.abort_handles
-            .lock()
-            .await
-            .remove(&(task_id, partition.clone()));
+        self.abort_handles.remove(&(task_id, partition.clone()));
 
         self.metrics_collector.record_stage(
             &partition.job_id,
@@ -162,7 +157,7 @@ impl Executor {
         stage_id: usize,
         partition_id: usize,
     ) -> Result<bool, BallistaError> {
-        if let Some(handle) = self.abort_handles.lock().await.remove(&(
+        if let Some((_, handle)) = self.abort_handles.remove(&(
             task_id,
             PartitionId {
                 job_id,
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
index 7caca105..ce347900 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -31,11 +31,10 @@ use arrow_flight::{
     HandshakeResponse, Location, Ticket,
 };
 use log::{debug, error, warn};
-use std::collections::HashMap;
 use std::convert::TryFrom;
 use std::pin::Pin;
 use std::str::FromStr;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
 use std::time::Duration;
 use tonic::{Request, Response, Status, Streaming};
 
@@ -51,6 +50,7 @@ use ballista_core::serde::protobuf::JobStatus;
 use ballista_core::serde::protobuf::PhysicalPlanNode;
 use ballista_core::serde::protobuf::SuccessfulJob;
 use ballista_core::utils::create_grpc_client_connection;
+use dashmap::DashMap;
 use datafusion::arrow;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions};
@@ -66,16 +66,16 @@ use uuid::Uuid;
 
 pub struct FlightSqlServiceImpl {
     server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
-    statements: Arc<Mutex<HashMap<Uuid, LogicalPlan>>>,
-    contexts: Arc<Mutex<HashMap<Uuid, Arc<SessionContext>>>>,
+    statements: Arc<DashMap<Uuid, LogicalPlan>>,
+    contexts: Arc<DashMap<Uuid, Arc<SessionContext>>>,
 }
 
 impl FlightSqlServiceImpl {
     pub fn new(server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>) -> 
Self {
         Self {
             server,
-            statements: Arc::new(Mutex::new(HashMap::new())),
-            contexts: Arc::new(Mutex::new(HashMap::new())),
+            statements: Default::default(),
+            contexts: Default::default(),
         }
     }
 
@@ -94,11 +94,7 @@ impl FlightSqlServiceImpl {
                 Status::internal(format!("Failed to create SessionContext: 
{:?}", e))
             })?;
         let handle = Uuid::new_v4();
-        let mut contexts = self
-            .contexts
-            .try_lock()
-            .map_err(|e| Status::internal(format!("Error locking contexts: 
{}", e)))?;
-        contexts.insert(handle.clone(), ctx);
+        self.contexts.insert(handle.clone(), ctx);
         Ok(handle)
     }
 
@@ -119,19 +115,14 @@ impl FlightSqlServiceImpl {
 
         let handle = Uuid::from_str(auth.as_str())
             .map_err(|e| Status::internal(format!("Error locking contexts: 
{}", e)))?;
-        let contexts = self
-            .contexts
-            .try_lock()
-            .map_err(|e| Status::internal(format!("Error locking contexts: 
{}", e)))?;
-        let context = if let Some(context) = contexts.get(&handle) {
-            context
+        if let Some(context) = self.contexts.get(&handle) {
+            Ok(context.clone())
         } else {
             Err(Status::internal(format!(
                 "Context handle not found: {}",
                 handle
             )))?
-        };
-        Ok(context.clone())
+        }
     }
 
     async fn prepare_statement(
@@ -244,36 +235,23 @@ impl FlightSqlServiceImpl {
 
     fn cache_plan(&self, plan: LogicalPlan) -> Result<Uuid, Status> {
         let handle = Uuid::new_v4();
-        let mut statements = self
-            .statements
-            .try_lock()
-            .map_err(|e| Status::internal(format!("Error locking statements: 
{}", e)))?;
-        statements.insert(handle, plan);
+        self.statements.insert(handle, plan);
         Ok(handle)
     }
 
     fn get_plan(&self, handle: &Uuid) -> Result<LogicalPlan, Status> {
-        let statements = self
-            .statements
-            .try_lock()
-            .map_err(|e| Status::internal(format!("Error locking statements: 
{}", e)))?;
-        let plan = if let Some(plan) = statements.get(handle) {
-            plan
+        if let Some(plan) = self.statements.get(handle) {
+            Ok(plan.clone())
         } else {
             Err(Status::internal(format!(
                 "Statement handle not found: {}",
                 handle
             )))?
-        };
-        Ok(plan.clone())
+        }
     }
 
     fn remove_plan(&self, handle: Uuid) -> Result<(), Status> {
-        let mut statements = self
-            .statements
-            .try_lock()
-            .map_err(|e| Status::internal(format!("Error locking statements: 
{}", e)))?;
-        statements.remove(&handle);
+        self.statements.remove(&handle);
         Ok(())
     }
 

Reply via email to