alamb commented on code in PR #3402:
URL: https://github.com/apache/arrow-rs/pull/3402#discussion_r1062525546
##########
arrow-flight/tests/client.rs:
##########
@@ -259,6 +232,248 @@ async fn test_do_get_error_in_record_batch_stream() {
.await;
}
+#[tokio::test]
+async fn test_do_put() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+
+ let expected_response = vec![
+ PutResult {
+ app_metadata: Bytes::from("foo"),
+ },
+ PutResult {
+ app_metadata: Bytes::from("bar"),
+ },
+ ];
+
+ test_server
+
.set_do_put_response(expected_response.clone().into_iter().map(Ok).collect());
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ assert_eq!(response, expected_response);
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_put response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = vec![
+ Ok(PutResult {
+ app_metadata: Bytes::from("foo"),
+ }),
+ Err(FlightError::Tonic(Status::invalid_argument("bad arg"))),
+ ];
+
+ test_server.set_do_put_response(response);
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::invalid_argument("bad arg");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+ let output_flight_data = test_flight_data2().await;
+
+ test_server.set_do_exchange_response(
+ output_flight_data.clone().into_iter().map(Ok).collect(),
+ );
+
+ let response_stream = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ let expected_stream =
futures::stream::iter(output_flight_data).map(Ok);
+
+ let expected_batches: Vec<_> =
+ FlightRecordBatchStream::new_from_flight_data(expected_stream)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(response, expected_batches);
+ assert_eq!(
+ test_server.take_do_exchange_request(),
+ Some(input_flight_data)
+ );
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
Review Comment:
as above
##########
arrow-flight/tests/common/server.rs:
##########
@@ -222,9 +263,23 @@ impl FlightService for TestFlightServer {
async fn do_put(
&self,
- _request: Request<Streaming<FlightData>>,
+ request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
- Err(Status::unimplemented("Implement do_put"))
+ self.save_metadata(&request);
+ let do_put_request: Vec<_> = request.into_inner().try_collect().await?;
+
+ let mut state = self.state.lock().expect("mutex not poisoned");
+
+ state.do_put_request = Some(do_put_request);
+
+ let response = state
+ .do_put_response
+ .take()
+ .ok_or_else(|| Status::internal("No do_put response configured"))?;
+
+ let stream = futures::stream::iter(response).map_err(|e| e.into());
+
+ Ok(Response::new(Box::pin(stream) as _))
Review Comment:
I applied these changes to the rest of this file
##########
arrow-flight/tests/client.rs:
##########
@@ -259,6 +232,248 @@ async fn test_do_get_error_in_record_batch_stream() {
.await;
}
+#[tokio::test]
+async fn test_do_put() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+
+ let expected_response = vec![
+ PutResult {
+ app_metadata: Bytes::from("foo"),
+ },
+ PutResult {
+ app_metadata: Bytes::from("bar"),
+ },
+ ];
+
+ test_server
+
.set_do_put_response(expected_response.clone().into_iter().map(Ok).collect());
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ assert_eq!(response, expected_response);
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_put response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_put_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = vec![
+ Ok(PutResult {
+ app_metadata: Bytes::from("foo"),
+ }),
+ Err(FlightError::Tonic(Status::invalid_argument("bad arg"))),
+ ];
+
+ test_server.set_do_put_response(response);
+
+ let response_stream = client
+ .do_put(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::invalid_argument("bad arg");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(test_server.take_do_put_request(), Some(input_flight_data));
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ // encode the batch as a stream of FlightData
+ let input_flight_data = test_flight_data().await;
+ let output_flight_data = test_flight_data2().await;
+
+ test_server.set_do_exchange_response(
+ output_flight_data.clone().into_iter().map(Ok).collect(),
+ );
+
+ let response_stream = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Vec<_> = response_stream
+ .try_collect()
+ .await
+ .expect("Error streaming data");
+
+ let expected_stream =
futures::stream::iter(output_flight_data).map(Ok);
+
+ let expected_batches: Vec<_> =
+ FlightRecordBatchStream::new_from_flight_data(expected_stream)
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(response, expected_batches);
+ assert_eq!(
+ test_server.take_do_exchange_request(),
+ Some(input_flight_data)
+ );
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange_error() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await;
+ let response = match response {
+ Ok(_) => panic!("unexpected success"),
+ Err(e) => e,
+ };
+
+ let e = Status::internal("No do_exchange response configured");
+ expect_status(response, e);
+ // server still got the request
+ assert_eq!(
+ test_server.take_do_exchange_request(),
+ Some(input_flight_data)
+ );
+ ensure_metadata(&client, &test_server);
+ })
+ .await;
+}
+
+#[tokio::test]
+async fn test_do_exchange_error_stream() {
+ do_test(|test_server, mut client| async move {
+ client.add_header("foo", "bar").unwrap();
+
+ let input_flight_data = test_flight_data().await;
+
+ let response = test_flight_data2()
+ .await
+ .into_iter()
+ .enumerate()
+ .map(|(i, m)| {
+ if i == 0 {
+ Ok(m)
+ } else {
+ // make all messages after the first an error
+ let e = tonic::Status::invalid_argument("the error");
+ Err(FlightError::Tonic(e))
+ }
+ })
+ .collect();
+
+ test_server.set_do_exchange_response(response);
+
+ let response_stream = client
+ .do_exchange(futures::stream::iter(input_flight_data.clone()))
+ .await
+ .expect("error making request");
+
+ let response: Result<Vec<_>, _> = response_stream.try_collect().await;
+ let response = match response {
Review Comment:
same issue as above
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]