This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e8aff59  Remove unecessary Mutex in Ballista client (#1898)
e8aff59 is described below

commit e8aff599607b05ca9be147724d820531158209f2
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Mar 6 07:08:24 2022 -0500

    Remove unecessary Mutex in Ballista client (#1898)
---
 ballista/rust/core/src/client.rs | 13 ++++---------
 1 file changed, 4 insertions(+), 9 deletions(-)

diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index ccbaea8..5441888 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -17,7 +17,6 @@
 
 //! Client API for sending requests to executors.
 
-use parking_lot::Mutex;
 use std::sync::Arc;
 
 use std::{
@@ -130,16 +129,13 @@ impl BallistaClient {
 }
 
 struct FlightDataStream {
-    stream: Mutex<Streaming<FlightData>>,
+    stream: Streaming<FlightData>,
     schema: SchemaRef,
 }
 
 impl FlightDataStream {
     pub fn new(stream: Streaming<FlightData>, schema: SchemaRef) -> Self {
-        Self {
-            stream: Mutex::new(stream),
-            schema,
-        }
+        Self { stream, schema }
     }
 }
 
@@ -147,11 +143,10 @@ impl Stream for FlightDataStream {
     type Item = ArrowResult<RecordBatch>;
 
     fn poll_next(
-        self: std::pin::Pin<&mut Self>,
+        mut self: std::pin::Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        let mut stream = self.stream.lock();
-        stream.poll_next_unpin(cx).map(|x| match x {
+        self.stream.poll_next_unpin(cx).map(|x| match x {
             Some(flight_data_chunk_result) => {
                 let converted_chunk = flight_data_chunk_result
                     .map_err(|e| ArrowError::from_external_error(Box::new(e)))

Reply via email to