skejserjensen opened a new issue, #2445:
URL: https://github.com/apache/arrow-rs/issues/2445
**Describe the bug**
<!--
A clear and concise description of what the bug is.
-->
The `SchemaResult` produced by `SchemaAsIpc` can be converted to a `Schema`
by the Rust implementation of Apache Arrow Flight but not other implementations
of Apache Arrow Flight (tested the Go, Java, and Python implementations).
**To Reproduce**
<!--
Steps to reproduce the behavior:
-->
For the Rust server, implement `FlightService.get_schema()` as:
```rust
async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
let tid = Field::new("tid", DataType::Int32, false);
let timestamp = Field::new("timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None), false);
let value = Field::new("value", DataType::Float32, false);
let schema = Schema::new(vec![tid, timestamp, value]);
let options = IpcWriteOptions::default();
let schema_as_ipc = SchemaAsIpc::new(&schema, &options);
let schema_result: SchemaResult = schema_as_ipc.into();
Ok(Response::new(schema_result))
}
```
Attempt to retrieve and print the `Schema` using the following Rust code:
```rust
use arrow::ipc::convert;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::FlightDescriptor;
use tokio::runtime::Runtime;
use tonic::Request;
fn main() {
let tokio = Runtime::new().unwrap();
tokio.block_on(async {
let mut flight_service_client =
FlightServiceClient::connect("grpc://127.0.0.1:9999").await.unwrap();
let flight_descriptor =
FlightDescriptor::new_path(vec!["".to_owned()]);
let request = Request::new(flight_descriptor);
let schema_result =
flight_service_client.get_schema(request).await.unwrap().into_inner();
let schema =
convert::schema_from_bytes(&schema_result.schema).unwrap();
dbg!(schema);
});
}
```
Attempt to retrieve and print the `Schema` using the following Python code:
```python
from pyarrow import flight
client = flight.FlightClient('grpc://127.0.0.1:9999')
descriptor = flight.FlightDescriptor.for_path("")
schema_result = client.get_schema(descriptor)
print(schema_result.schema)
```
**Expected behavior**
<!--
A clear and concise description of what you expected to happen.
-->
The Rust code should successfully retrieve and print the `Schema` while the
Python code should fail due to the following `OSError` being raised:
```python
Traceback (most recent call last):
File "get_schema.py", line 5, in <module>
print(schema_result.schema)
File "pyarrow/_flight.pyx", line 720, in
pyarrow._flight.SchemaResult.schema.__get__
File "pyarrow/_flight.pyx", line 80, in pyarrow._flight.check_flight_status
File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: Invalid flatbuffers message.
```
**Additional context**
<!--
Add any other context about the problem here.
-->
As the issue was first discovered using a client written in Go, we first
raised apache/arrow#13853 as we believed the problem was in the Go
implementation of Apache Arrow Flight. But from the comments provided by
@zeroshade on that issue, it seems that the Rust implementation of Apache Arrow
Flight deviates from the other implementations in how a `Schema` is serialized.
For example, both the C++ and Go implementations include the [continuation
indicator (0xFFFFFFFF) followed by the message length as a 32-bit
integer](https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format)
before the `Schema`:
```rust
use arrow_flight::{SchemaAsIpc, SchemaResult};
use arrow::datatypes::{Schema, TimeUnit, Field, DataType};
use arrow::ipc::writer::IpcWriteOptions;
fn main() {
let tid = Field::new("tid", DataType::Int32, false);
let timestamp = Field::new("timestamp",
DataType::Timestamp(TimeUnit::Millisecond, None), false);
let value = Field::new("value", DataType::Float32, false);
let schema = Schema::new(vec![tid, timestamp, value]);
let options = IpcWriteOptions::default();
let schema_as_ipc = SchemaAsIpc::new(&schema, &options);
let schema_result: SchemaResult = schema_as_ipc.into();
dbg!(schema_result);
}
```
16 0 0 0 0 0 10 0 14 0 12 0 11 0 4 0 10 0 0 0 20 0 0 0 0 0 0 1 4 0 10 0 12 0
0 0 8 0 4 0 10 0 0 0 8 0 0 0 8 0 0 0 0 0 0 0 3 0 0 0 136 0 0 0 52 0 0 0 4 0 0 0
148 255 255 255 16 0 0 0 20 0 0 0 0 0 0 3 16 0 0 0 206 255 255 255 0 0 1 0 0 0
0 0 5 0 0 0 118 97 108 117 101 0 0 0 192 255 255 255 28 0 0 0 12 0 0 0 0 0 0 10
32 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0 6 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 9 0 0 0 116
105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0 15 0 4 0 0 0 8 0 16 0 0
0 24 0 0 0 32 0 0 0 0 0 0 2 28 0 0 0 8 0 12 0 4 0 11 0 8 0 0 0 32 0 0 0 0 0 0 1
0 0 0 0 3 0 0 0 116 105 100 0
```c++
#include <iostream>
#include "arrow/type.h"
#include "arrow/buffer.h"
#include "arrow/ipc/writer.h"
int main() {
std::shared_ptr<arrow::Field> tid = arrow::field("tid", arrow::int32());
std::shared_ptr<arrow::Field> timestamp = arrow::field("timestamp",
arrow::timestamp(arrow::TimeUnit::MILLI));
std::shared_ptr<arrow::Field> value = arrow::field("value",
arrow::float32());
std::shared_ptr<arrow::Schema> schema_ptr = arrow::schema({tid, timestamp,
value});
arrow::Schema schema = *schema_ptr.get();
std::shared_ptr<arrow::Buffer> serialized_schema =
arrow::ipc::SerializeSchema(schema).ValueOrDie();
size_t serialized_schema_size = serialized_schema->size();
for (int index = 0; index < serialized_schema_size; index++) {
std::cout << unsigned((*serialized_schema)[index]) << ' ';
}
std::cout << std::endl;
}
```
**255 255 255 255 224 0 0 0** 16 0 0 0 0 0 10 0 12 0 6 0 5 0 8 0 10 0 0 0 0
1 4 0 12 0 0 0 8 0 8 0 0 0 4 0 8 0 0 0 4 0 0 0 3 0 0 0 124 0 0 0 52 0 0 0 4 0 0
0 160 255 255 255 0 0 1 3 16 0 0 0 24 0 0 0 4 0 0 0 0 0 0 0 5 0 0 0 118 97 108
117 101 0 0 0 210 255 255 255 0 0 1 0 204 255 255 255 0 0 1 10 16 0 0 0 32 0 0
0 4 0 0 0 0 0 0 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 6 0 8 0 6 0 6 0
0 0 0 0 1 0 16 0 20 0 8 0 6 0 7 0 12 0 0 0 16 0 16 0 0 0 0 0 1 2 16 0 0 0 28 0
0 0 4 0 0 0 0 0 0 0 3 0 0 0 116 105 100 0 8 0 12 0 8 0 7 0 8 0 0 0 0 0 0 1 32 0
0 0
```go
package main
import (
"fmt"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/flight"
"github.com/apache/arrow/go/arrow/memory"
)
func main() {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "tid", Type: arrow.PrimitiveTypes.Int32},
{Name: "timestamp", Type:
arrow.FixedWidthTypes.Timestamp_ms},
{Name: "value", Type: arrow.PrimitiveTypes.Float32},
},
nil,
)
serialized_schema :=
flight.SerializeSchema(schema,memory.DefaultAllocator)
fmt.Println(serialized_schema)
}
```
**255 255 255 255 248 0 0 0** 16 0 0 0 0 0 10 0 12 0 10 0 9 0 4 0 10 0 0 0
16 0 0 0 0 1 4 0 8 0 8 0 0 0 4 0 8 0 0 0 4 0 0 0 3 0 0 0 148 0 0 0 60 0 0 0 4 0
0 0 136 255 255 255 16 0 0 0 24 0 0 0 0 0 0 3 24 0 0 0 0 0 0 0 0 0 6 0 8 0 6 0
6 0 0 0 0 0 1 0 5 0 0 0 118 97 108 117 101 0 0 0 188 255 255 255 16 0 0 0 24 0
0 0 0 0 0 10 36 0 0 0 0 0 0 0 8 0 12 0 10 0 4 0 8 0 0 0 8 0 0 0 0 0 1 0 3 0 0 0
85 84 67 0 9 0 0 0 116 105 109 101 115 116 97 109 112 0 0 0 16 0 20 0 16 0 0 0
15 0 8 0 0 0 4 0 16 0 0 0 16 0 0 0 24 0 0 0 0 0 0 2 28 0 0 0 0 0 0 0 8 0 12 0 8
0 7 0 8 0 0 0 0 0 0 1 32 0 0 0 3 0 0 0 116 105 100 0 255 255 255 255 0 0 0 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]