This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 2625c73c Replace Mutex<HashMap> by using DashMap (#449)
2625c73c is described below
commit 2625c73c57efcc26804218586d9e5d8000ea1ff1
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Oct 26 21:11:42 2022 +0800
Replace Mutex<HashMap> by using DashMap (#449)
---
ballista/executor/src/executor.rs | 17 +++++-------
ballista/scheduler/src/flight_sql.rs | 52 +++++++++++-------------------------
2 files changed, 21 insertions(+), 48 deletions(-)
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 47f5fe33..1c25baa3 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -17,6 +17,7 @@
//! Ballista executor logic
+use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::Arc;
@@ -35,9 +36,8 @@ use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use futures::future::AbortHandle;
use ballista_core::serde::scheduler::PartitionId;
-use tokio::sync::Mutex;
-type AbortHandles = Arc<Mutex<HashMap<(usize, PartitionId), AbortHandle>>>;
+type AbortHandles = Arc<DashMap<(usize, PartitionId), AbortHandle>>;
/// Ballista executor
#[derive(Clone)]
@@ -106,17 +106,12 @@ impl Executor {
shuffle_writer.execute_shuffle_write(partition.partition_id,
task_ctx),
);
- {
- let mut abort_handles = self.abort_handles.lock().await;
- abort_handles.insert((task_id, partition.clone()), abort_handle);
- }
+ self.abort_handles
+ .insert((task_id, partition.clone()), abort_handle);
let partitions = task.await??;
- self.abort_handles
- .lock()
- .await
- .remove(&(task_id, partition.clone()));
+ self.abort_handles.remove(&(task_id, partition.clone()));
self.metrics_collector.record_stage(
&partition.job_id,
@@ -162,7 +157,7 @@ impl Executor {
stage_id: usize,
partition_id: usize,
) -> Result<bool, BallistaError> {
- if let Some(handle) = self.abort_handles.lock().await.remove(&(
+ if let Some((_, handle)) = self.abort_handles.remove(&(
task_id,
PartitionId {
job_id,
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index 7caca105..ce347900 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -31,11 +31,10 @@ use arrow_flight::{
HandshakeResponse, Location, Ticket,
};
use log::{debug, error, warn};
-use std::collections::HashMap;
use std::convert::TryFrom;
use std::pin::Pin;
use std::str::FromStr;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::time::Duration;
use tonic::{Request, Response, Status, Streaming};
@@ -51,6 +50,7 @@ use ballista_core::serde::protobuf::JobStatus;
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::protobuf::SuccessfulJob;
use ballista_core::utils::create_grpc_client_connection;
+use dashmap::DashMap;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::ipc::writer::{IpcDataGenerator, IpcWriteOptions};
@@ -66,16 +66,16 @@ use uuid::Uuid;
pub struct FlightSqlServiceImpl {
server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>,
- statements: Arc<Mutex<HashMap<Uuid, LogicalPlan>>>,
- contexts: Arc<Mutex<HashMap<Uuid, Arc<SessionContext>>>>,
+ statements: Arc<DashMap<Uuid, LogicalPlan>>,
+ contexts: Arc<DashMap<Uuid, Arc<SessionContext>>>,
}
impl FlightSqlServiceImpl {
pub fn new(server: SchedulerServer<LogicalPlanNode, PhysicalPlanNode>) ->
Self {
Self {
server,
- statements: Arc::new(Mutex::new(HashMap::new())),
- contexts: Arc::new(Mutex::new(HashMap::new())),
+ statements: Default::default(),
+ contexts: Default::default(),
}
}
@@ -94,11 +94,7 @@ impl FlightSqlServiceImpl {
Status::internal(format!("Failed to create SessionContext:
{:?}", e))
})?;
let handle = Uuid::new_v4();
- let mut contexts = self
- .contexts
- .try_lock()
- .map_err(|e| Status::internal(format!("Error locking contexts:
{}", e)))?;
- contexts.insert(handle.clone(), ctx);
+ self.contexts.insert(handle.clone(), ctx);
Ok(handle)
}
@@ -119,19 +115,14 @@ impl FlightSqlServiceImpl {
let handle = Uuid::from_str(auth.as_str())
.map_err(|e| Status::internal(format!("Error locking contexts:
{}", e)))?;
- let contexts = self
- .contexts
- .try_lock()
- .map_err(|e| Status::internal(format!("Error locking contexts:
{}", e)))?;
- let context = if let Some(context) = contexts.get(&handle) {
- context
+ if let Some(context) = self.contexts.get(&handle) {
+ Ok(context.clone())
} else {
Err(Status::internal(format!(
"Context handle not found: {}",
handle
)))?
- };
- Ok(context.clone())
+ }
}
async fn prepare_statement(
@@ -244,36 +235,23 @@ impl FlightSqlServiceImpl {
fn cache_plan(&self, plan: LogicalPlan) -> Result<Uuid, Status> {
let handle = Uuid::new_v4();
- let mut statements = self
- .statements
- .try_lock()
- .map_err(|e| Status::internal(format!("Error locking statements:
{}", e)))?;
- statements.insert(handle, plan);
+ self.statements.insert(handle, plan);
Ok(handle)
}
fn get_plan(&self, handle: &Uuid) -> Result<LogicalPlan, Status> {
- let statements = self
- .statements
- .try_lock()
- .map_err(|e| Status::internal(format!("Error locking statements:
{}", e)))?;
- let plan = if let Some(plan) = statements.get(handle) {
- plan
+ if let Some(plan) = self.statements.get(handle) {
+ Ok(plan.clone())
} else {
Err(Status::internal(format!(
"Statement handle not found: {}",
handle
)))?
- };
- Ok(plan.clone())
+ }
}
fn remove_plan(&self, handle: Uuid) -> Result<(), Status> {
- let mut statements = self
- .statements
- .try_lock()
- .map_err(|e| Status::internal(format!("Error locking statements:
{}", e)))?;
- statements.remove(&handle);
+ self.statements.remove(&handle);
Ok(())
}