This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e8aff59 Remove unecessary Mutex in Ballista client (#1898)
e8aff59 is described below
commit e8aff599607b05ca9be147724d820531158209f2
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Mar 6 07:08:24 2022 -0500
Remove unecessary Mutex in Ballista client (#1898)
---
ballista/rust/core/src/client.rs | 13 ++++---------
1 file changed, 4 insertions(+), 9 deletions(-)
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index ccbaea8..5441888 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -17,7 +17,6 @@
//! Client API for sending requests to executors.
-use parking_lot::Mutex;
use std::sync::Arc;
use std::{
@@ -130,16 +129,13 @@ impl BallistaClient {
}
struct FlightDataStream {
- stream: Mutex<Streaming<FlightData>>,
+ stream: Streaming<FlightData>,
schema: SchemaRef,
}
impl FlightDataStream {
pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
- Self {
- stream: Mutex::new(stream),
- schema,
- }
+ Self { stream, schema }
}
}
@@ -147,11 +143,10 @@ impl Stream for FlightDataStream {
type Item = ArrowResult<RecordBatch>;
fn poll_next(
- self: std::pin::Pin<&mut Self>,
+ mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
- let mut stream = self.stream.lock();
- stream.poll_next_unpin(cx).map(|x| match x {
+ self.stream.poll_next_unpin(cx).map(|x| match x {
Some(flight_data_chunk_result) => {
let converted_chunk = flight_data_chunk_result
.map_err(|e| ArrowError::from_external_error(Box::new(e)))