This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 407de2a Remove unnecessary mutex (#639)
407de2a is described below
commit 407de2a60550d1fcb36fe6da2e77dde6ddb3621c
Author: Ximo Guanter <[email protected]>
AuthorDate: Mon Jun 28 16:25:15 2021 +0200
Remove unnecessary mutex (#639)
---
ballista/rust/core/src/execution_plans/query_stage.rs | 11 ++++-------
1 file changed, 4 insertions(+), 7 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs
b/ballista/rust/core/src/execution_plans/query_stage.rs
index c117110..1e91540 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/query_stage.rs
@@ -184,7 +184,7 @@ impl ExecutionPlan for QueryStageExec {
// we won't necessary produce output for every possible
partition, so we
// create writers on demand
- let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> =
vec![];
+ let mut writers: Vec<Option<ShuffleWriter>> = vec![];
for _ in 0..num_output_partitions {
writers.push(None);
}
@@ -229,9 +229,8 @@ impl ExecutionPlan for QueryStageExec {
RecordBatch::try_new(input_batch.schema(),
columns)?;
// write batch out
- match &writers[num_output_partition] {
+ match &mut writers[num_output_partition] {
Some(w) => {
- let mut w = w.lock().unwrap();
w.write(&output_batch)?;
}
None => {
@@ -247,8 +246,7 @@ impl ExecutionPlan for QueryStageExec {
ShuffleWriter::new(path,
stream.schema().as_ref())?;
writer.write(&output_batch)?;
- writers[num_output_partition] =
- Some(Arc::new(Mutex::new(writer)));
+ writers[num_output_partition] = Some(writer);
}
}
}
@@ -262,10 +260,9 @@ impl ExecutionPlan for QueryStageExec {
let mut num_batches_builder = UInt64Builder::new(num_writers);
let mut num_bytes_builder = UInt64Builder::new(num_writers);
- for (i, w) in writers.iter().enumerate() {
+ for (i, w) in writers.iter_mut().enumerate() {
match w {
Some(w) => {
- let mut w = w.lock().unwrap();
w.finish()?;
path_builder.append_value(w.path())?;
partition_builder.append_value(i as u32)?;