alamb commented on code in PR #7492:
URL: https://github.com/apache/arrow-rs/pull/7492#discussion_r2086946791
##########
arrow-flight/src/sql/server.rs:
##########
@@ -710,10 +710,21 @@ where
// we wrap this stream in a `Peekable` one, which allows us to peek at
// the first message without discarding it.
let mut request = request.map(PeekableFlightDataStream::new);
- let cmd = Pin::new(request.get_mut()).peek().await.unwrap().clone()?;
+ let mut stream = Pin::new(request.get_mut());
+
+ // If the stream is empty or the first item is an Err,
+ // return early with 0 records affected
+ let peeked_item = stream.peek().await.cloned();
+ let Some(cmd) = peeked_item else {
+ let result = DoPutUpdateResult { record_count: 0 };
+ let output = futures::stream::iter(vec![Ok(PutResult {
+ app_metadata: result.encode_to_vec().into(),
+ })]);
+ return Ok(Response::new(Box::pin(output)));
+ };
let message =
-
Any::decode(&*cmd.flight_descriptor.unwrap().cmd).map_err(decode_error_to_status)?;
+
Any::decode(&*cmd?.flight_descriptor.unwrap().cmd).map_err(decode_error_to_status)?;
Review Comment:
not introduced by this PR, but I think this other `unwrap` will still panic
if a client sends a malformed input (no flight_descriptor).
##########
arrow-flight/tests/flight_sql_client.rs:
##########
@@ -116,6 +116,53 @@ pub async fn test_execute_ingest_error() {
);
}
+#[tokio::test]
+pub async fn test_execute_ingest_empty_stream() {
+ // Test for https://github.com/apache/arrow-rs/issues/7329
+
+ let test_server = FlightSqlServiceImpl::new();
+ let fixture = TestFixture::new(test_server.service()).await;
+ let channel = fixture.channel().await;
+ let mut flight_sql_client = FlightSqlServiceClient::new(channel);
+ let cmd = make_ingest_command();
+
+ // make sure there were no records ingested
+ let actual_rows = flight_sql_client
+ .execute_ingest(cmd, futures::stream::iter(vec![]).map(Ok))
+ .await
+ .expect("ingest should succeed");
+ assert_eq!(actual_rows, 0);
+
+ // make sure there were no batches sent to the server
+ let ingested_batches = test_server.ingested_batches.lock().await.clone();
+ assert_eq!(ingested_batches, vec![]);
+}
+
+#[tokio::test]
+pub async fn test_execute_ingest_first_item_error() {
+ // Test for https://github.com/apache/arrow-rs/issues/7329
+
+ let test_server = FlightSqlServiceImpl::new();
+ let fixture = TestFixture::new(test_server.service()).await;
+ let channel = fixture.channel().await;
+ let mut flight_sql_client = FlightSqlServiceClient::new(channel);
+ let cmd = make_ingest_command();
+
+ // send an error from the client
+ let batches = vec![Err(FlightError::NotYetImplemented(
+ "Client error message".to_string(),
+ ))];
+
+ // make sure the client error was returned
+ let err = flight_sql_client
+ .execute_ingest(cmd, futures::stream::iter(batches))
+ .await
+ .unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "External error: Not yet implemented: Client error message"
Review Comment:
I think this error never makes it to the server -- it is redirected client
side in
https://github.com/apache/arrow-rs/blob/87ff531f0b4e109c2166c548d20c850e6b2c8fc5/arrow-flight/src/sql/client.rs#L269-L268
I think you'll have to create a manual FlightClient and send the error
directly to exercise the server code
##########
arrow-flight/src/sql/server.rs:
##########
@@ -710,10 +710,21 @@ where
// we wrap this stream in a `Peekable` one, which allows us to peek at
// the first message without discarding it.
let mut request = request.map(PeekableFlightDataStream::new);
- let cmd = Pin::new(request.get_mut()).peek().await.unwrap().clone()?;
+ let mut stream = Pin::new(request.get_mut());
+
+ // If the stream is empty or the first item is an Err,
+ // return early with 0 records affected
+ let peeked_item = stream.peek().await.cloned();
+ let Some(cmd) = peeked_item else {
+ let result = DoPutUpdateResult { record_count: 0 };
+ let output = futures::stream::iter(vec![Ok(PutResult {
+ app_metadata: result.encode_to_vec().into(),
+ })]);
+ return Ok(Response::new(Box::pin(output)));
Review Comment:
II the client is not sending a command, it probably is making a mistake, and
returning an `Ok` will mask the problem rather than failing fast
Rather than return an Ok, I recommend invoking `do_put_fallback` in this
case which will error by default, and lets users override the behavior if they
wish.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]