tustvold commented on code in PR #3402:
URL: https://github.com/apache/arrow-rs/pull/3402#discussion_r1061348846
##########
arrow-flight/src/client.rs:
##########
@@ -256,13 +258,273 @@ impl FlightClient {
Ok(response)
}
+ /// Make a `DoPut` call to the server with the provided
+ /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+ /// stream of [`PutResult`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::{TryStreamExt, StreamExt};
+ /// # use std::sync::Arc;
+ /// # use arrow_array::UInt64Array;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+ /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+ /// # let batch = RecordBatch::try_from_iter(vec![
+ /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+ /// # ]).unwrap();
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // encode the batch as a stream of `FlightData`
+ /// let flight_data_stream = FlightDataEncoderBuilder::new()
Review Comment:
It's probably a bit late now, but looking at this API in use, it is kind of
a little strange Stream isn't mentioned at all in the name. i.e.
`s/Encoder/Stream` or something...
##########
arrow-flight/src/client.rs:
##########
@@ -256,13 +258,273 @@ impl FlightClient {
Ok(response)
}
+ /// Make a `DoPut` call to the server with the provided
+ /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+ /// stream of [`PutResult`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::{TryStreamExt, StreamExt};
+ /// # use std::sync::Arc;
+ /// # use arrow_array::UInt64Array;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+ /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+ /// # let batch = RecordBatch::try_from_iter(vec![
+ /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+ /// # ]).unwrap();
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // encode the batch as a stream of `FlightData`
+ /// let flight_data_stream = FlightDataEncoderBuilder::new()
+ /// .build(futures::stream::iter(vec![Ok(batch)]))
+ /// // data encoder return Results, but do_put requires FlightData
Review Comment:
Perhaps this should accept a `Stream<Item=Result<FlightData>>`, otherwise
I'm not sure how one could use this to stream a fallible computation without
panicking?
##########
arrow-flight/tests/client.rs:
##########
@@ -259,6 +232,248 @@ async fn test_do_get_error_in_record_batch_stream() {
.await;
}
+#[tokio::test]
+async fn test_do_put() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+
+ let expected_response = vec![
+ PutResult {
+ app_metadata: Bytes::from("foo"),
+ },
+ PutResult {
+ app_metadata: Bytes::from("bar"),
+ },
+ ];
+
+ test_server
+
.set_do_put_response(expected_response.clone().into_iter().map(Ok).collect());
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ assert_eq!(response, expected_response);
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_put response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = vec![
+ Ok(PutResult {
+ app_metadata: Bytes::from("foo"),
+ }),
+ Err(FlightError::Tonic(Status::invalid_argument("bad arg"))),
+ ];
+
+ test_server.set_do_put_response(response);
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
Review Comment:
unwrap_err? Or is this some lack of Debug issue?
##########
arrow-flight/src/client.rs:
##########
@@ -256,13 +258,273 @@ impl FlightClient {
Ok(response)
}
+ /// Make a `DoPut` call to the server with the provided
+ /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+ /// stream of [`PutResult`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::{TryStreamExt, StreamExt};
+ /// # use std::sync::Arc;
+ /// # use arrow_array::UInt64Array;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+ /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+ /// # let batch = RecordBatch::try_from_iter(vec![
+ /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+ /// # ]).unwrap();
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // encode the batch as a stream of `FlightData`
+ /// let flight_data_stream = FlightDataEncoderBuilder::new()
+ /// .build(futures::stream::iter(vec![Ok(batch)]))
+ /// // data encoder return Results, but do_put requires FlightData
+ /// .map(|batch|batch.unwrap());
+ ///
+ /// // send the stream and get the results as `PutResult`
+ /// let response: Vec<PutResult>= client
+ /// .do_put(flight_data_stream)
+ /// .await
+ /// .unwrap()
+ /// .try_collect() // use TryStreamExt to collect stream
+ /// .await
+ /// .expect("error calling do_put");
+ /// # }
+ /// ```
+ pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
+ &mut self,
+ request: S,
+ ) -> Result<BoxStream<'static, Result<PutResult>>> {
+ let request = self.make_request(request);
+
+ let response = self
+ .inner
+ .do_put(request)
+ .await?
+ .into_inner()
+ .map_err(FlightError::Tonic);
+
+ Ok(response.boxed())
+ }
+
+ /// Make a `DoExchange` call to the server with the provided
+ /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+ /// stream of [`FlightData`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::{TryStreamExt, StreamExt};
+ /// # use std::sync::Arc;
+ /// # use arrow_array::UInt64Array;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+ /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+ /// # let batch = RecordBatch::try_from_iter(vec![
+ /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+ /// # ]).unwrap();
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // encode the batch as a stream of `FlightData`
+ /// let flight_data_stream = FlightDataEncoderBuilder::new()
+ /// .build(futures::stream::iter(vec![Ok(batch)]))
+ /// // data encoder return Results, but do_put requires FlightData
+ /// .map(|batch|batch.unwrap());
+ ///
+ /// // send the stream and get the results as `RecordBatches`
+ /// let response: Vec<RecordBatch> = client
+ /// .do_exchange(flight_data_stream)
+ /// .await
+ /// .unwrap()
+ /// .try_collect() // use TryStreamExt to collect stream
+ /// .await
+ /// .expect("error calling do_put");
Review Comment:
```suggestion
/// .expect("error calling do_exchange");
```
Tbh I think unwrap is perfectly valid here, this isn't really adding any
additional context not already provided by the line number
##########
arrow-flight/tests/client.rs:
##########
@@ -259,6 +232,248 @@ async fn test_do_get_error_in_record_batch_stream() {
.await;
}
+#[tokio::test]
+async fn test_do_put() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+
+ let expected_response = vec![
+ PutResult {
+ app_metadata: Bytes::from("foo"),
+ },
+ PutResult {
+ app_metadata: Bytes::from("bar"),
+ },
+ ];
+
+ test_server
+
.set_do_put_response(expected_response.clone().into_iter().map(Ok).collect());
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ assert_eq!(response, expected_response);
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_put response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = vec![
+ Ok(PutResult {
+ app_metadata: Bytes::from("foo"),
+ }),
+ Err(FlightError::Tonic(Status::invalid_argument("bad arg"))),
+ ];
+
+ test_server.set_do_put_response(response);
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::invalid_argument("bad arg");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+ let output_flight_data = test_flight_data2().await;
+
+ test_server.set_do_exchange_response(
+ output_flight_data.clone().into_iter().map(Ok).collect(),
+ );
+
+ let response_stream = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ let expected_stream =
futures::stream::iter(output_flight_data).map(Ok);
+
+ let expected_batches: Vec<_> =
+ FlightRecordBatchStream::new_from_flight_data(expected_stream)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(response, expected_batches);
+ assert_eq!(
+ test_server.take_do_exchange_request(),
+ Some(input_flight_data)
+ );
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_exchange response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(
+ test_server.take_do_exchange_request(),
+ Some(input_flight_data)
+ );
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = test_flight_data2()
+ .await
+ .into_iter()
+ .enumerate()
+ .map(|(i, m)| {
+ if i == 0 {
+ Ok(m)
+ } else {
+ // make all messages after the first an error
+ let e = tonic::Status::invalid_argument("the error");
+ Err(FlightError::Tonic(e))
+ }
+ })
+ .collect();
+
+ test_server.set_do_exchange_response(response);
+
+ let response_stream = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
+ let response = match response {
Review Comment:
unwrap_err
##########
arrow-flight/src/client.rs:
##########
@@ -256,13 +258,273 @@ impl FlightClient {
Ok(response)
}
+ /// Make a `DoPut` call to the server with the provided
+ /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+ /// stream of [`PutResult`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::{TryStreamExt, StreamExt};
+ /// # use std::sync::Arc;
+ /// # use arrow_array::UInt64Array;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+ /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+ /// # let batch = RecordBatch::try_from_iter(vec![
+ /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+ /// # ]).unwrap();
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // encode the batch as a stream of `FlightData`
+ /// let flight_data_stream = FlightDataEncoderBuilder::new()
+ /// .build(futures::stream::iter(vec![Ok(batch)]))
+ /// // data encoder return Results, but do_put requires FlightData
+ /// .map(|batch|batch.unwrap());
+ ///
+ /// // send the stream and get the results as `PutResult`
+ /// let response: Vec<PutResult>= client
+ /// .do_put(flight_data_stream)
+ /// .await
+ /// .unwrap()
+ /// .try_collect() // use TryStreamExt to collect stream
+ /// .await
+ /// .expect("error calling do_put");
+ /// # }
+ /// ```
+ pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
+ &mut self,
+ request: S,
+ ) -> Result<BoxStream<'static, Result<PutResult>>> {
+ let request = self.make_request(request);
+
+ let response = self
+ .inner
+ .do_put(request)
+ .await?
+ .into_inner()
+ .map_err(FlightError::Tonic);
+
+ Ok(response.boxed())
+ }
+
+ /// Make a `DoExchange` call to the server with the provided
+ /// [`Stream`](futures::Stream) of [`FlightData`] and returning a
+ /// stream of [`FlightData`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::{TryStreamExt, StreamExt};
+ /// # use std::sync::Arc;
+ /// # use arrow_array::UInt64Array;
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
+ /// # use arrow_flight::encode::FlightDataEncoderBuilder;
+ /// # let batch = RecordBatch::try_from_iter(vec![
+ /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
+ /// # ]).unwrap();
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // encode the batch as a stream of `FlightData`
+ /// let flight_data_stream = FlightDataEncoderBuilder::new()
+ /// .build(futures::stream::iter(vec![Ok(batch)]))
+ /// // data encoder return Results, but do_put requires FlightData
+ /// .map(|batch|batch.unwrap());
+ ///
+ /// // send the stream and get the results as `RecordBatches`
+ /// let response: Vec<RecordBatch> = client
+ /// .do_exchange(flight_data_stream)
+ /// .await
+ /// .unwrap()
+ /// .try_collect() // use TryStreamExt to collect stream
+ /// .await
+ /// .expect("error calling do_put");
+ /// # }
+ /// ```
+ pub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>(
+ &mut self,
+ request: S,
+ ) -> Result<FlightRecordBatchStream> {
+ let request = self.make_request(request);
+
+ let response = self
+ .inner
+ .do_exchange(request)
+ .await?
+ .into_inner()
+ .map_err(FlightError::Tonic);
+
+ Ok(FlightRecordBatchStream::new_from_flight_data(response))
+ }
+
+ /// Make a `ListFlights` call to the server with the provided
+ /// critera and returns a [`Stream`](futures::Stream) of [`FlightInfo`].
+ ///
+ /// # Example:
+ /// ```no_run
+ /// # async fn run() {
+ /// # use futures::TryStreamExt;
+ /// # use bytes::Bytes;
+ /// # use arrow_flight::{FlightInfo, FlightClient};
+ /// # let channel: tonic::transport::Channel = unimplemented!();
+ /// let mut client = FlightClient::new(channel);
+ ///
+ /// // Send 'Name=Foo' bytes as the "expression" to the server
+ /// // and gather the returned FlightInfo
+ /// let responses: Vec<FlightInfo> = client
+ /// .list_flights(Bytes::from("Name=Foo"))
+ /// .await
+ /// .expect("error listing flights")
+ /// .try_collect() // use TryStreamExt to collect stream
+ /// .await
+ /// .expect("error gathering flights");
+ /// # }
+ /// ```
+ pub async fn list_flights(
+ &mut self,
+ expression: impl Into<Bytes>,
Review Comment:
Is the idea a higher-level FlightSQL client will handle the Any encoding
nonsense?
##########
arrow-flight/tests/client.rs:
##########
@@ -259,6 +232,248 @@ async fn test_do_get_error_in_record_batch_stream() {
.await;
}
+#[tokio::test]
+async fn test_do_put() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+
+ let expected_response = vec![
+ PutResult {
+ app_metadata: Bytes::from("foo"),
Review Comment:
```suggestion
app_metadata: Bytes::from("hello"),
```
Initially I got confused thinking this was matching the header
##########
arrow-flight/tests/common/server.rs:
##########
@@ -243,8 +298,22 @@ impl FlightService for TestFlightServer {
async fn do_exchange(
&self,
- _request: Request<Streaming<FlightData>>,
+ request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
- Err(Status::unimplemented("Implement do_exchange"))
+ self.save_metadata(&request);
+ let do_exchange_request: Vec<_> =
request.into_inner().try_collect().await?;
+
+ let mut state = self.state.lock().expect("mutex not poisoned");
+
+ state.do_exchange_request = Some(do_exchange_request);
+
+ let response = state
+ .do_exchange_response
+ .take()
+ .ok_or_else(|| Status::internal("No do_exchange response
configured"))?;
+
+ let stream = futures::stream::iter(response).map_err(|e| e.into());
+
+ Ok(Response::new(Box::pin(stream) as _))
Review Comment:
```suggestion
let stream = futures::stream::iter(response).map_err(Into::into);
Ok(Response::new(stream.boxed()))
```
##########
arrow-flight/tests/client.rs:
##########
@@ -259,6 +232,248 @@ async fn test_do_get_error_in_record_batch_stream() {
.await;
}
+#[tokio::test]
+async fn test_do_put() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+
+ let expected_response = vec![
+ PutResult {
+ app_metadata: Bytes::from("foo"),
+ },
+ PutResult {
+ app_metadata: Bytes::from("bar"),
+ },
+ ];
+
+ test_server
+
.set_do_put_response(expected_response.clone().into_iter().map(Ok).collect());
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ assert_eq!(response, expected_response);
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_put response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = vec![
+ Ok(PutResult {
+ app_metadata: Bytes::from("foo"),
+ }),
+ Err(FlightError::Tonic(Status::invalid_argument("bad arg"))),
+ ];
+
+ test_server.set_do_put_response(response);
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::invalid_argument("bad arg");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+ let output_flight_data = test_flight_data2().await;
+
+ test_server.set_do_exchange_response(
+ output_flight_data.clone().into_iter().map(Ok).collect(),
+ );
+
+ let response_stream = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ let expected_stream =
futures::stream::iter(output_flight_data).map(Ok);
+
+ let expected_batches: Vec<_> =
+ FlightRecordBatchStream::new_from_flight_data(expected_stream)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(response, expected_batches);
+ assert_eq!(
+ test_server.take_do_exchange_request(),
+ Some(input_flight_data)
+ );
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
Review Comment:
unwrap_err
##########
arrow-flight/tests/common/server.rs:
##########
@@ -222,9 +263,23 @@ impl FlightService for TestFlightServer {
async fn do_put(
&self,
- _request: Request<Streaming<FlightData>>,
+ request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
- Err(Status::unimplemented("Implement do_put"))
+ self.save_metadata(&request);
+ let do_put_request: Vec<_> = request.into_inner().try_collect().await?;
+
+ let mut state = self.state.lock().expect("mutex not poisoned");
+
+ state.do_put_request = Some(do_put_request);
+
+ let response = state
+ .do_put_response
+ .take()
+ .ok_or_else(|| Status::internal("No do_put response configured"))?;
+
+ let stream = futures::stream::iter(response).map_err(|e| e.into());
Review Comment:
```suggestion
let stream = futures::stream::iter(response).map_err(Into::into);
```
##########
arrow-flight/tests/common/server.rs:
##########
@@ -222,9 +263,23 @@ impl FlightService for TestFlightServer {
async fn do_put(
&self,
- _request: Request<Streaming<FlightData>>,
+ request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
- Err(Status::unimplemented("Implement do_put"))
+ self.save_metadata(&request);
+ let do_put_request: Vec<_> = request.into_inner().try_collect().await?;
+
+ let mut state = self.state.lock().expect("mutex not poisoned");
+
+ state.do_put_request = Some(do_put_request);
+
+ let response = state
+ .do_put_response
+ .take()
+ .ok_or_else(|| Status::internal("No do_put response configured"))?;
+
+ let stream = futures::stream::iter(response).map_err(|e| e.into());
+
+ Ok(Response::new(Box::pin(stream) as _))
Review Comment:
```suggestion
Ok(Response::new(stream.boxed()))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]