alamb commented on code in PR #5728:
URL: https://github.com/apache/arrow-rs/pull/5728#discussion_r1592955729


##########
arrow-flight/src/client.rs:
##########
@@ -704,3 +674,99 @@ impl FlightClient {
         request
     }
 }
+
+/// Wrapper around fallible stream such that when
+/// it encounters an error it uses the oneshot sender to
+/// notify the error and stop any further streaming. See `do_put` or
+/// `do_exchange` for it's uses.
+struct FallibleRequestStream<T, E> {
+    /// sender to notify error
+    sender: Option<Sender<E>>,
+    /// fallible stream
+    fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> + 
Send + 'static>>,
+}
+
+impl<T, E> FallibleRequestStream<T, E> {
+    fn new(
+        sender: Sender<E>,
+        fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> 
+ Send + 'static>>,
+    ) -> Self {
+        Self {
+            sender: Some(sender),
+            fallible_stream,
+        }
+    }
+}
+
+impl<T, E> Stream for FallibleRequestStream<T, E> {
+    type Item = T;
+
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let pinned = self.get_mut();
+        let mut request_streams = pinned.fallible_stream.as_mut();
+        match ready!(request_streams.poll_next_unpin(cx)) {
+            Some(Ok(data)) => Poll::Ready(Some(data)),
+            Some(Err(e)) => {
+                // unwrap() here is safe, ownership of sender will

Review Comment:
   Another potential improvement might be to do something like
   
   ```rust
   if let Some(sender) = pinned.sender.take() {
     sender.send();
   }
   ```
   And that way avoid a panic
   
   
   However, I see this unwrap is simply moved here from elsewhere in the PR so 
I think we can improve it in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to