opensourcegeek commented on code in PR #5728:
URL: https://github.com/apache/arrow-rs/pull/5728#discussion_r1592973999
##########
arrow-flight/src/client.rs:
##########
@@ -704,3 +674,99 @@ impl FlightClient {
request
}
}
+
+/// Wrapper around fallible stream such that when
+/// it encounters an error it uses the oneshot sender to
+/// notify the error and stop any further streaming. See `do_put` or
+/// `do_exchange` for it's uses.
+struct FallibleRequestStream<T, E> {
+ /// sender to notify error
+ sender: Option<Sender<E>>,
+ /// fallible stream
+ fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>> +
Send + 'static>>,
+}
+
+impl<T, E> FallibleRequestStream<T, E> {
+ fn new(
+ sender: Sender<E>,
+ fallible_stream: Pin<Box<dyn Stream<Item = std::result::Result<T, E>>
+ Send + 'static>>,
+ ) -> Self {
+ Self {
+ sender: Some(sender),
+ fallible_stream,
+ }
+ }
+}
+
+impl<T, E> Stream for FallibleRequestStream<T, E> {
+ type Item = T;
+
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let pinned = self.get_mut();
+ let mut request_streams = pinned.fallible_stream.as_mut();
+ match ready!(request_streams.poll_next_unpin(cx)) {
+ Some(Ok(data)) => Poll::Ready(Some(data)),
+ Some(Err(e)) => {
+ // unwrap() here is safe, ownership of sender will
Review Comment:
Ah good point, it would've taken less time to swap the code than writing the
justification comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]