alamb commented on code in PR #5728:
URL: https://github.com/apache/arrow-rs/pull/5728#discussion_r1592964323


##########
arrow-flight/src/client.rs:
##########
@@ -704,3 +674,99 @@ impl FlightClient {
         request
     }
 }
+
+/// Wrapper around fallible stream such that when
+/// it encounters an error it uses the oneshot sender to
+/// notify the error and stop any further streaming. See `do_put` or
+/// `do_exchange` for it's uses.
+struct FallibleRequestStream<T, E> {
+    /// sender to notify error
+    sender: Option<Sender<E>>,
+    /// fallible stream
+    fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + 
Send + 'static>>,
+}
+
+impl<T, E> FallibleRequestStream<T, E> {
+    fn new(
+        sender: Sender<E>,
+        fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> 
+ Send + 'static>>,
+    ) -> Self {
+        Self {
+            sender: Some(sender),
+            fallible_stream,
+        }
+    }
+}
+
+impl<T, E> Stream for FallibleRequestStream<T, E> {
+    type Item = T;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let pinned = self.get_mut();
+        let mut request_streams = pinned.fallible_stream.as_mut();
+        match ready!(request_streams.poll_next_unpin(cx)) {
+            Some(Ok(data)) => Poll::Ready(Some(data)),
+            Some(Err(e)) => {
+                // unwrap() here is safe, ownership of sender will

Review Comment:
   https://github.com/apache/arrow-rs/pull/5734



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to