suremarc opened a new issue, #4658:
URL: https://github.com/apache/arrow-rs/issues/4658

   **Describe the bug**
   When executing `DoPutPreparedStatementQuery` requests to bind parameters to 
a prepared statement, using the `DoPut` Flight RPC, one may receive an error 
like this:
   ```
   rpc error: code = Internal desc = Received RecordBatch prior to Schema
   ```
   
   This is because the implementation of `FlightService` for FlightSQL servers 
consumes the first flight batch by calling `.message()` on the 
`Request<Streaming<FlightData>>` handle:
   ```rust
   async fn do_put(
       &self,
       mut request: Request<Streaming<FlightData>>,
   ) -> Result<Response<Self::DoPutStream>, Status> {
       let cmd = request.get_mut().message().await?.unwrap();
   ```
   It does this to inspect the type of the `FlightDescriptor`, which tells it 
which `DoPut` handler to dispatch to in the `FlightSqlService`. If the schema 
was encoded in the very first message together with the `FlightDescriptor`, 
calling `.message()` consumes both the `FlightDescriptor` and the schema, so 
that a `FlightSqlService` handler such as `do_put_prepared_statement_query` no 
longer can read the schema from the `Request<Streaming<FlightData>>`. 
   
   **To Reproduce**
   Implement a `FlightSqlService` that handles any kind of `DoPut` request and 
decodes the flight stream, such as the following:
   ```rust
   impl FlightSqlService for FlightSqlServer {
       async fn do_put_prepared_statement_query(
           &self,
           query: CommandPreparedStatementQuery,
           request: Request<Streaming<FlightData>>,
       ) -> Result<Response<<Self as FlightService>::DoPutStream>> {
           info!("do_put_prepared_statement_query");
           let parameters = FlightRecordBatchStream::new_from_flight_data(
               request.into_inner().map_err(|e| e.into()),
           )
           .try_collect::<Vec<_>>()
           .await?;
   
           todo!()
       }
   }
   ```
   
   then execute a `DoPut` using a raw `do_put` request on the 
`FlightSqlServiceClient`, sending the following batch:
   ```rust
   let flight_stream = FlightDataEncoderBuilder::new()
       .with_flight_descriptor(Some(FlightDescriptor::new_cmd(
           cmd.as_any().encode_to_vec(),
       )))
       .with_schema(batch.schema())
       .build(stream::once(async { Ok(batch) }));
   client
       .do_put(flight_stream.map(Result::unwrap))
       .await
       .context("submit DoPut request")?
       .try_collect::<Vec<_>>()
       .await
       .context("read DoPut response")?;
   ```
   
   **Expected behavior**
   The server should be able to properly handle flight streams sent from a 
compliant FlightSQL client. 
   
   **Additional context**
   This bug was uncovered while implementing and testing prepared statements 
for a Rust FlightSQL server. The same bug was observed when submitting queries 
from both Rust and Go FlightSQL clients (the server is a Rust FlightSQL 
server). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to