This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bbad36f7d Minor: move `FallibleRequestStream` and 
`FallibleTonicResponseStream` to a module (#6258)
0bbad36f7d is described below

commit 0bbad36f7d462a7f199fa12dd7f290f8dac40507
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Aug 20 14:45:47 2024 -0400

    Minor: move `FallibleRequestStream` and `FallibleTonicResponseStream` to a 
module (#6258)
    
    * Minor: move FallibleRequestStream and FallibleTonicResponseStream to 
their own modules
    
    * Improve documentation and add links
---
 arrow-flight/src/client.rs     | 107 +-------------------------------
 arrow-flight/src/lib.rs        |   1 +
 arrow-flight/src/sql/client.rs |   2 +-
 arrow-flight/src/streams.rs    | 134 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 138 insertions(+), 106 deletions(-)

diff --git a/arrow-flight/src/client.rs b/arrow-flight/src/client.rs
index af3c8fba30..97d9899a9f 100644
--- a/arrow-flight/src/client.rs
+++ b/arrow-flight/src/client.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{pin::Pin, task::Poll};
-
 use crate::{
     decode::FlightRecordBatchStream,
     flight_service_client::FlightServiceClient,
@@ -28,16 +26,15 @@ use crate::{
 use arrow_schema::Schema;
 use bytes::Bytes;
 use futures::{
-    channel::oneshot::{Receiver, Sender},
     future::ready,
-    ready,
     stream::{self, BoxStream},
-    FutureExt, Stream, StreamExt, TryStreamExt,
+    Stream, StreamExt, TryStreamExt,
 };
 use prost::Message;
 use tonic::{metadata::MetadataMap, transport::Channel};
 
 use crate::error::{FlightError, Result};
+use crate::streams::{FallibleRequestStream, FallibleTonicResponseStream};
 
 /// A "Mid level" [Apache Arrow 
Flight](https://arrow.apache.org/docs/format/Flight.html) client.
 ///
@@ -674,103 +671,3 @@ impl FlightClient {
         request
     }
 }
-
-/// Wrapper around fallible stream such that when
-/// it encounters an error it uses the oneshot sender to
-/// notify the error and stop any further streaming. See `do_put` or
-/// `do_exchange` for it's uses.
-pub(crate) struct FallibleRequestStream<T, E> {
-    /// sender to notify error
-    sender: Option<Sender<E>>,
-    /// fallible stream
-    fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + 
Send + 'static>>,
-}
-
-impl<T, E> FallibleRequestStream<T, E> {
-    pub(crate) fn new(
-        sender: Sender<E>,
-        fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> 
+ Send + 'static>>,
-    ) -> Self {
-        Self {
-            sender: Some(sender),
-            fallible_stream,
-        }
-    }
-}
-
-impl<T, E> Stream for FallibleRequestStream<T, E> {
-    type Item = T;
-
-    fn poll_next(
-        self: std::pin::Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Self::Item>> {
-        let pinned = self.get_mut();
-        let mut request_streams = pinned.fallible_stream.as_mut();
-        match ready!(request_streams.poll_next_unpin(cx)) {
-            Some(Ok(data)) => Poll::Ready(Some(data)),
-            Some(Err(e)) => {
-                // in theory this should only ever be called once
-                // as this stream should not be polled again after returning
-                // None, however we still check for None to be safe
-                if let Some(sender) = pinned.sender.take() {
-                    // an error means the other end of the channel is not 
around
-                    // to receive the error, so ignore it
-                    let _ = sender.send(e);
-                }
-                Poll::Ready(None)
-            }
-            None => Poll::Ready(None),
-        }
-    }
-}
-
-/// Wrapper for a tonic response stream that can produce a tonic
-/// error. This is tied to a oneshot receiver which can be notified
-/// of other errors. When it receives an error through receiver
-/// end, it prioritises that error to be sent back. See `do_put` or
-/// `do_exchange` for it's uses
-struct FallibleTonicResponseStream<T> {
-    /// Receiver for FlightError
-    receiver: Receiver<FlightError>,
-    /// Tonic response stream
-    response_stream:
-        Pin<Box<dyn Stream<Item = std::result::Result<T, tonic::Status>> + 
Send + 'static>>,
-}
-
-impl<T> FallibleTonicResponseStream<T> {
-    fn new(
-        receiver: Receiver<FlightError>,
-        response_stream: Pin<
-            Box<dyn Stream<Item = std::result::Result<T, tonic::Status>> + 
Send + 'static>,
-        >,
-    ) -> Self {
-        Self {
-            receiver,
-            response_stream,
-        }
-    }
-}
-
-impl<T> Stream for FallibleTonicResponseStream<T> {
-    type Item = Result<T>;
-
-    fn poll_next(
-        self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let pinned = self.get_mut();
-        let receiver = &mut pinned.receiver;
-        // Prioritise sending the error that's been notified over
-        // polling the response_stream
-        if let Poll::Ready(Ok(err)) = receiver.poll_unpin(cx) {
-            return Poll::Ready(Some(Err(err)));
-        };
-
-        match ready!(pinned.response_stream.poll_next_unpin(cx)) {
-            Some(Ok(res)) => Poll::Ready(Some(Ok(res))),
-            Some(Err(status)) => 
Poll::Ready(Some(Err(FlightError::Tonic(status)))),
-            None => Poll::Ready(None),
-        }
-    }
-}
diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs
index 8fa61b1d57..1180264e5d 100644
--- a/arrow-flight/src/lib.rs
+++ b/arrow-flight/src/lib.rs
@@ -120,6 +120,7 @@ pub mod utils;
 
 #[cfg(feature = "flight-sql-experimental")]
 pub mod sql;
+mod streams;
 
 use flight_descriptor::DescriptorType;
 
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index 345254a63a..ef52aa27ef 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -24,7 +24,6 @@ use std::collections::HashMap;
 use std::str::FromStr;
 use tonic::metadata::AsciiMetadataKey;
 
-use crate::client::FallibleRequestStream;
 use crate::decode::FlightRecordBatchStream;
 use crate::encode::FlightDataEncoderBuilder;
 use crate::error::FlightError;
@@ -43,6 +42,7 @@ use crate::sql::{
     CommandStatementIngest, CommandStatementQuery, CommandStatementUpdate,
     DoPutPreparedStatementResult, DoPutUpdateResult, ProstMessageExt, SqlInfo,
 };
+use crate::streams::FallibleRequestStream;
 use crate::trailers::extract_lazy_trailers;
 use crate::{
     Action, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, 
HandshakeResponse,
diff --git a/arrow-flight/src/streams.rs b/arrow-flight/src/streams.rs
new file mode 100644
index 0000000000..e532a80e1e
--- /dev/null
+++ b/arrow-flight/src/streams.rs
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FallibleRequestStream`] and [`FallibleTonicResponseStream`] adapters
+
+use crate::error::FlightError;
+use futures::{
+    channel::oneshot::{Receiver, Sender},
+    FutureExt, Stream, StreamExt,
+};
+use std::pin::Pin;
+use std::task::{ready, Poll};
+
+/// Wrapper around a fallible stream (one that returns errors) that makes it 
infallible.
+///
+/// Any errors encountered in the stream are ignored are sent to the provided
+/// oneshot sender.
+///
+/// This can be used to accept a stream of `Result<_>` from a client API and 
send
+/// them to the remote server that wants only the successful results.
+pub(crate) struct FallibleRequestStream<T, E> {
+    /// sender to notify error
+    sender: Option<Sender<E>>,
+    /// fallible stream
+    fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + 
Send + 'static>>,
+}
+
+impl<T, E> FallibleRequestStream<T, E> {
+    pub(crate) fn new(
+        sender: Sender<E>,
+        fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> 
+ Send + 'static>>,
+    ) -> Self {
+        Self {
+            sender: Some(sender),
+            fallible_stream,
+        }
+    }
+}
+
+impl<T, E> Stream for FallibleRequestStream<T, E> {
+    type Item = T;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let pinned = self.get_mut();
+        let mut request_streams = pinned.fallible_stream.as_mut();
+        match ready!(request_streams.poll_next_unpin(cx)) {
+            Some(Ok(data)) => Poll::Ready(Some(data)),
+            Some(Err(e)) => {
+                // in theory this should only ever be called once
+                // as this stream should not be polled again after returning
+                // None, however we still check for None to be safe
+                if let Some(sender) = pinned.sender.take() {
+                    // an error means the other end of the channel is not 
around
+                    // to receive the error, so ignore it
+                    let _ = sender.send(e);
+                }
+                Poll::Ready(None)
+            }
+            None => Poll::Ready(None),
+        }
+    }
+}
+
+/// Wrapper for a tonic response stream that maps errors to `FlightError` and
+/// returns errors from a oneshot channel into the stream.
+///
+/// The user of this stream can inject an error into the response stream using
+/// the one shot receiver. This is used to propagate errors in
+/// [`FlightClient::do_put`] and [`FlightClient::do_exchange`] from the client
+/// provided input stream to the response stream.
+///
+/// # Error Priority
+/// Error from the receiver are prioritised over the response stream.
+///
+/// [`FlightClient::do_put`]: crate::FlightClient::do_put
+/// [`FlightClient::do_exchange`]: crate::FlightClient::do_exchange
+pub(crate) struct FallibleTonicResponseStream<T> {
+    /// Receiver for FlightError
+    receiver: Receiver<FlightError>,
+    /// Tonic response stream
+    response_stream: Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + 
Send + 'static>>,
+}
+
+impl<T> FallibleTonicResponseStream<T> {
+    pub(crate) fn new(
+        receiver: Receiver<FlightError>,
+        response_stream: Pin<Box<dyn Stream<Item = Result<T, tonic::Status>> + 
Send + 'static>>,
+    ) -> Self {
+        Self {
+            receiver,
+            response_stream,
+        }
+    }
+}
+
+impl<T> Stream for FallibleTonicResponseStream<T> {
+    type Item = Result<T, FlightError>;
+
+    fn poll_next(
+        self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        let pinned = self.get_mut();
+        let receiver = &mut pinned.receiver;
+        // Prioritise sending the error that's been notified over
+        // polling the response_stream
+        if let Poll::Ready(Ok(err)) = receiver.poll_unpin(cx) {
+            return Poll::Ready(Some(Err(err)));
+        };
+
+        match ready!(pinned.response_stream.poll_next_unpin(cx)) {
+            Some(Ok(res)) => Poll::Ready(Some(Ok(res))),
+            Some(Err(status)) => 
Poll::Ready(Some(Err(FlightError::Tonic(status)))),
+            None => Poll::Ready(None),
+        }
+    }
+}

Reply via email to