RoyZhang2022 opened a new issue, #5361:
URL: https://github.com/apache/arrow-rs/issues/5361

   **Which part is this question about**
   <!--
   Is it code base, library api, documentation or some other part?
   -->
   arrow-flight
   
   **Describe your question**
   <!--
   A clear and concise description of what the question is.
   -->
   I am using 
[arrow-flightsql-odbc](https://github.com/timvw/arrow-flightsql-odbc/ ) as a 
flightsql server. I am using `flight_sql_client` from arrow-rs/arrow-flight to 
act as a client to test it. It could connect to the server successfully. But 
when I run a simple query `select 1`, the client always failed with an error 
`"Received RecordBatch prior to Schema"`. I am trying to figure out why this 
error could be generated. At server side I printed the RecordBatch value as 
below. It includes a schema and result in one RecordBatch. At client side the 
flight_data is also printed. We can see the `FlightDataDecoder` state is `None` 
which is causing this failure. From `arrow-rs/arrow-flight/src/decode.rs` we 
can see the flight data handle for RecordBatch. When the state is `None`, error 
`"Received RecordBatch prior to Schema"` is printed.  So my question is how I 
could make the state to be not `None`. The RecordBatch already included the 
schema,  should I send the schema in another standal
 one RecordBatch? The schema is already sent at the first FlightInfo message. 
Also the FlightInfo is handled at `execute_flight` of 
arrow-rs/arrow-flight/src/bin/flight_sql_client.rs. So I am not sure what is 
wrong. Could someone help here? Thanks in advance!
   
   -- arrow-flightsql-odbc server side RecordBatch value.
   ```
   [src/odbc_command_handler.rs:176] batch = RecordBatch {
       schema: Schema {
           fields: [
               Field {
                   name: "",
                   data_type: Int32,
                   nullable: false,
                   dict_id: 0,
                   dict_is_ordered: false,
                   metadata: {},
               },
           ],
           metadata: {},
       },
       columns: [
           PrimitiveArray<Int32>
           [
             1,
           ],
       ],
       row_count: 1,
   }
   ```
   
   --flight_data at client side
   ```
   [arrow-flight/src/bin/flight_sql_client.rs:210] &flight_data = 
FlightRecordBatchStream {
       headers: MetadataMap {
           headers: {
               "content-type": "application/grpc",
               "date": "Fri, 02 Feb 2024 22:30:31 GMT",
           },
       },
       trailers: Some(
           LazyTrailers {
               trailers: Mutex {
                   data: None,
                   poisoned: false,
                   ..
               },
           },
       ),
       inner: FlightDataDecoder {
           response: "<stream>",
           state: None,
           done: false,
       },
   }
   ```
   -- code from arrow-rs/arrow-flight/src/decode.rs
   ```
   impl FlightDataDecoder {
        ......
       /// Extracts flight data from the next message, updating decoding
       /// state as necessary.
       fn extract_message(&mut self, data: FlightData) -> 
Result<Option<DecodedFlightData>> {
           use arrow_ipc::MessageHeader;
           let message = arrow_ipc::root_as_message(&data.data_header[..])
               .map_err(|e| FlightError::DecodeError(format!("Error decoding 
root message: {e}")))?;
   
           match message.header_type() {
               MessageHeader::RecordBatch => {
                   println!("Value of self.state before if: {:?}", self.state);
                   let state = if let Some(state) = self.state.as_ref() {
                       state
                   } else {
                       return Err(FlightError::protocol(
                           "Received RecordBatch prior to Schema",
                       ));
                   };
   
   ```
   
   -- FlightInfo handle at arrow-rs/arrow-flight/src/bin/flight_sql_client.rs
   ```
   async fn execute_flight(
       client: &mut FlightSqlServiceClient<Channel>,
       info: FlightInfo,
   ) -> Result<Vec<RecordBatch>> {
       let schema = Arc::new(Schema::try_from(info.clone()).context("valid 
schema")?);
       dbg!(&schema);
       let mut batches = Vec::with_capacity(info.endpoint.len() + 1);
       batches.push(RecordBatch::new_empty(schema));
       info!("decoded schema");
   
       for endpoint in info.endpoint {
           let Some(ticket) = &endpoint.ticket else {
               bail!("did not get ticket");
           };
   
           let mut flight_data = 
client.do_get(ticket.clone()).await.context("do get")?;
           dbg!(&flight_data);
           log_metadata(flight_data.headers(), "header");
   
           let mut endpoint_batches: Vec<_> = (&mut flight_data)
               .try_collect()
               .await
               .context("collect data stream")?;
           batches.append(&mut endpoint_batches);
   
           if let Some(trailers) = flight_data.trailers() {
               log_metadata(&trailers, "trailer");
           }
       }
       info!("received data");
   
       Ok(batches)
   }
   ```
   
   **Additional context**
   <!--
   Add any other context about the problem here.
   -->
   Running on Ubuntu 20.04 64-bit, I also updated  flightsql server proto files 
and arrow and arrow-odbc dependency to newest versions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to