raulcd commented on code in PR #47115:
URL: https://github.com/apache/arrow/pull/47115#discussion_r2225045071
##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const
ServerCallContext& context,
class RecordBatchStream::RecordBatchStreamImpl {
public:
- // Stages of the stream when producing payloads
- enum class Stage {
- NEW, // The stream has been created, but Next has not been called
yet
- DICTIONARY, // Dictionaries have been collected, and are being sent
- RECORD_BATCH // Initial have been sent
- };
-
RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
const ipc::IpcWriteOptions& options)
- : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+ : reader_(reader), options_(options) {}
std::shared_ptr<Schema> schema() { return reader_->schema(); }
Status GetSchemaPayload(FlightPayload* payload) {
- return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
- &payload->ipc_message);
+ if (!writer_) {
+ // Create the IPC writer on first call
+ auto payload_writer =
+ std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+ auto writer_result = ipc::internal::OpenRecordBatchWriter(
+ std::move(payload_writer), reader_->schema(), options_);
+
+ if (!writer_result.ok()) {
+ return writer_result.status();
+ }
+ writer_ = std::move(writer_result).ValueOrDie();
+ }
+
+ // Return the current payload (schema)
+ if (!payload_list_.empty()) {
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
+ return Status::OK();
+ }
+ return Status::UnknownError("No schema payload generated");
}
Status Next(FlightPayload* payload) {
- if (stage_ == Stage::NEW) {
- RETURN_NOT_OK(reader_->ReadNext(¤t_batch_));
- if (!current_batch_) {
- // Signal that iteration is over
- payload->ipc_message.metadata = nullptr;
- return Status::OK();
+ // If we have previous payloads (dictionary messages or previous record
batches)
+ // return them first before reading next record batch.
+ if (!payload_list_.empty()) {
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
+ return Status::OK();
+ }
+
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+ if (!batch) {
+ // End of stream
+ if (writer_) {
+ RETURN_NOT_OK(writer_->Close());
}
- ARROW_ASSIGN_OR_RAISE(dictionaries_,
- ipc::CollectDictionaries(*current_batch_,
mapper_));
- stage_ = Stage::DICTIONARY;
+ payload->ipc_message.metadata = nullptr;
+ return Status::OK();
}
- if (stage_ == Stage::DICTIONARY) {
- if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
- stage_ = Stage::RECORD_BATCH;
- return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
- &payload->ipc_message);
- } else {
- return GetNextDictionary(payload);
+ // Check if writer is already initialized or if schema has changed.
+ // To recreate the writer.
Review Comment:
The test is `TestFlightSqlServer.TestCommandGetTablesWithIncludedSchemas`
and failed with:
```c++
/home/raulcd/code/arrow/cpp/src/arrow/flight/sql/example/sqlite_server.cc:395:
GetTables include_schema=1
/home/raulcd/code/arrow/cpp/src/arrow/flight/sql/example/sqlite_server.cc:409:
GetTables: SELECT 'main' as catalog_name, null as schema_name, name as
table_name, type as table_type FROM sqlite_master where 1=1 and table_name LIKE
'int%' order by table_name
Reading next payload from data stream: 1
/home/raulcd/code/arrow/cpp/src/arrow/flight/sql/server_test.cc:325: Failure
Failed
'_error_or_value62.status()' failed with Invalid: Tried to write record
batch with different schema
/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:339
writer_->WriteRecordBatch(*batch)
/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:415
impl_->Next(&payload)
/home/raulcd/code/arrow/cpp/src/arrow/flight/transport_server.cc:291
data_stream->Next(). gRPC client debug context:
{"created":"@1753261907.620461787","description":"Error received from peer
ipv4:127.0.0.1:40331","file":"/home/raulcd/code/arrow/cpp/build/grpc_ep-prefix/src/grpc_ep/src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Tried
to write record batch with different
schema\n/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:339
writer_->WriteRecordBatch(*batch)\n/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:415
impl_->Next(&payload)\n/home/raulcd/code/arrow/cpp/src/arrow/flight/transport_server.cc:291
data_stream->Next()","grpc_status":3}. Client context: OK
/home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:219
stream_->Finish(Status::OK())
/home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:256 Next()
/home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:116
ToRecordBatches(stop_token)
```
The schemas difference was `table_schema` from `binary` to `binary not null`:
```md
## Batch schema:
catalog_name: string
db_schema_name: string
table_name: string not null
table_type: string not null
table_schema: binary
## Reader Schema:
catalog_name: string
db_schema_name: string
table_name: string not null
table_type: string not null
table_schema: binary not null
```
I've found the issue and fixed here:
https://github.com/apache/arrow/pull/47115/commits/d4305a8de40802db19bfaa27528bd94203219a56
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]