tustvold commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319797124
##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
);
}
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Review Comment:
Do we need multi thread here?
##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
/// Implementors may override to handle additional calls to do_put()
async fn do_put_fallback(
&self,
- _request: Request<Streaming<FlightData>>,
+ _request: Request<Peekable<Streaming<FlightData>>>,
Review Comment:
Another option might be to pass the first ticket request as a separate
argument. I don't feel strongly either way
##########
arrow-flight/src/sql/server.rs:
##########
@@ -688,9 +688,17 @@ where
async fn do_put(
&self,
- mut request: Request<Streaming<FlightData>>,
+ request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
- let cmd = request.get_mut().message().await?.unwrap();
+ // See issue #4658: https://github.com/apache/arrow-rs/issues/4658
+ // To dispatch to the correct `do_put` method, we cannot discard the
first message,
+ // as it may contain the Arrow schema, which the `do_put` handler may
need.
+ // To allow the first message to be reused by the `do_put` handler,
+ // we wrap this stream in a `Peekable` one, which allows us to peek at
+ // the first message without discarding it.
+ let mut request = request.map(futures::StreamExt::peekable);
Review Comment:
So if I am following correctly, the issue is do_put accepts a FlightData
stream, but the first request in the stream will contain a FlightDescriptor in
addition to potentially any data. I continue to be utterly baffled by the
design of Flight :sweat_smile:
##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
Ok(())
}
+ /// Submit parameters to the server, if any have been set on this prepared
statement instance
+ async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+ if let Some(ref params_batch) = self.parameter_binding {
+ let cmd = CommandPreparedStatementQuery {
+ prepared_statement_handle: self.handle.clone(),
+ };
+
+ let descriptor =
FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+ let flight_stream_builder = FlightDataEncoderBuilder::new()
+ .with_flight_descriptor(Some(descriptor))
+ .with_schema(params_batch.schema());
+ let flight_data = flight_stream_builder
+ .build(futures::stream::iter(
+ self.parameter_binding.clone().map(Ok),
+ ))
+ .try_collect::<Vec<_>>()
+ .await
+ .map_err(flight_error_to_arrow_error)?;
+
+ self.flight_sql_client
Review Comment:
This appears consistent with the FlightSQL specification, it uses do_put to
bind the parameter arguments. What isn't clear to me is if the result should be
being used in some way.
This would seem to imply some sort of server-side state which I had perhaps
expected FlightSQL to not rely on
##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) ->
ArrowError {
ArrowError::IpcError(format!("{status:?}"))
}
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+ match err {
+ FlightError::Arrow(e) => e,
+ FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+ FlightError::Tonic(status) => status_to_arrow_error(status),
+ FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+ FlightError::DecodeError(s) => ArrowError::IpcError(s),
+ FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+ }
Review Comment:
```suggestion
match err {
FlightError::Arrow(e) => e,
e => ArrowError::ExternalError(Box::new(e))
}
```
Tbh this could probably be added as a From impl
##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) ->
ArrowError {
ArrowError::IpcError(format!("{status:?}"))
}
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+ match err {
+ FlightError::Arrow(e) => e,
+ FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+ FlightError::Tonic(status) => status_to_arrow_error(status),
+ FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+ FlightError::DecodeError(s) => ArrowError::IpcError(s),
+ FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+ }
+}
Review Comment:
I agree this should probably be FlightError
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]