tustvold commented on code in PR #4797:
URL: https://github.com/apache/arrow-rs/pull/4797#discussion_r1319797124


##########
arrow-flight/tests/flight_sql_client_cli.rs:
##########
@@ -87,10 +91,56 @@ async fn test_simple() {
     );
 }
 
+const PREPARED_QUERY: &str = "SELECT * FROM table WHERE field = $1";
+const PREPARED_STATEMENT_HANDLE: &str = "prepared_statement_handle";
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

Review Comment:
   Do we need multi thread here?



##########
arrow-flight/src/sql/server.rs:
##########
@@ -366,7 +366,7 @@ pub trait FlightSqlService: Sync + Send + Sized + 'static {
     /// Implementors may override to handle additional calls to do_put()
     async fn do_put_fallback(
         &self,
-        _request: Request<Streaming<FlightData>>,
+        _request: Request<Peekable<Streaming<FlightData>>>,

Review Comment:
   Another option might be to pass the first ticket request as a separate 
argument. I don't feel strongly either way



##########
arrow-flight/src/sql/server.rs:
##########
@@ -688,9 +688,17 @@ where
 
     async fn do_put(
         &self,
-        mut request: Request<Streaming<FlightData>>,
+        request: Request<Streaming<FlightData>>,
     ) -> Result<Response<Self::DoPutStream>, Status> {
-        let cmd = request.get_mut().message().await?.unwrap();
+        // See issue #4658: https://github.com/apache/arrow-rs/issues/4658
+        // To dispatch to the correct `do_put` method, we cannot discard the 
first message,
+        // as it may contain the Arrow schema, which the `do_put` handler may 
need.
+        // To allow the first message to be reused by the `do_put` handler,
+        // we wrap this stream in a `Peekable` one, which allows us to peek at
+        // the first message without discarding it.
+        let mut request = request.map(futures::StreamExt::peekable);

Review Comment:
   So if I am following correctly, the issue is do_put accepts a FlightData 
stream, but the first request  in the stream will contain a FlightDescriptor in 
addition to potentially any data. I continue to be utterly baffled by the 
design of Flight :sweat_smile: 
   
   
   
   



##########
arrow-flight/src/sql/client.rs:
##########
@@ -492,6 +499,36 @@ impl PreparedStatement<Channel> {
         Ok(())
     }
 
+    /// Submit parameters to the server, if any have been set on this prepared 
statement instance
+    async fn write_bind_params(&mut self) -> Result<(), ArrowError> {
+        if let Some(ref params_batch) = self.parameter_binding {
+            let cmd = CommandPreparedStatementQuery {
+                prepared_statement_handle: self.handle.clone(),
+            };
+
+            let descriptor = 
FlightDescriptor::new_cmd(cmd.as_any().encode_to_vec());
+            let flight_stream_builder = FlightDataEncoderBuilder::new()
+                .with_flight_descriptor(Some(descriptor))
+                .with_schema(params_batch.schema());
+            let flight_data = flight_stream_builder
+                .build(futures::stream::iter(
+                    self.parameter_binding.clone().map(Ok),
+                ))
+                .try_collect::<Vec<_>>()
+                .await
+                .map_err(flight_error_to_arrow_error)?;
+
+            self.flight_sql_client

Review Comment:
   This appears consistent with the FlightSQL specification, it uses do_put to 
bind the parameter arguments. What isn't clear to me is if the result should be 
being used in some way. 
   
   This would seem to imply some sort of server-side state which I had perhaps 
expected FlightSQL to not rely on



##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> 
ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }

Review Comment:
   ```suggestion
       match err {
           FlightError::Arrow(e) => e,
           e => ArrowError::ExternalError(Box::new(e))
       }
   ```
   Tbh this could probably be added as a From impl



##########
arrow-flight/src/sql/client.rs:
##########
@@ -515,6 +552,17 @@ fn status_to_arrow_error(status: tonic::Status) -> 
ArrowError {
     ArrowError::IpcError(format!("{status:?}"))
 }
 
+fn flight_error_to_arrow_error(err: FlightError) -> ArrowError {
+    match err {
+        FlightError::Arrow(e) => e,
+        FlightError::NotYetImplemented(s) => ArrowError::NotYetImplemented(s),
+        FlightError::Tonic(status) => status_to_arrow_error(status),
+        FlightError::ProtocolError(e) => ArrowError::IpcError(e),
+        FlightError::DecodeError(s) => ArrowError::IpcError(s),
+        FlightError::ExternalError(e) => ArrowError::ExternalError(e),
+    }
+}

Review Comment:
   I agree this should probably be FlightError



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to