raulcd commented on code in PR #47115: URL: https://github.com/apache/arrow/pull/47115#discussion_r2225045071
########## cpp/src/arrow/flight/server.cc: ########## @@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const ServerCallContext& context, class RecordBatchStream::RecordBatchStreamImpl { public: - // Stages of the stream when producing payloads - enum class Stage { - NEW, // The stream has been created, but Next has not been called yet - DICTIONARY, // Dictionaries have been collected, and are being sent - RECORD_BATCH // Initial have been sent - }; - RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader, const ipc::IpcWriteOptions& options) - : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {} + : reader_(reader), options_(options) {} std::shared_ptr<Schema> schema() { return reader_->schema(); } Status GetSchemaPayload(FlightPayload* payload) { - return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_, - &payload->ipc_message); + if (!writer_) { + // Create the IPC writer on first call + auto payload_writer = + std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_); + auto writer_result = ipc::internal::OpenRecordBatchWriter( + std::move(payload_writer), reader_->schema(), options_); + + if (!writer_result.ok()) { + return writer_result.status(); + } + writer_ = std::move(writer_result).ValueOrDie(); + } + + // Return the current payload (schema) + if (!payload_list_.empty()) { + *payload = std::move(payload_list_.front()); + payload_list_.pop_front(); + return Status::OK(); + } + return Status::UnknownError("No schema payload generated"); } Status Next(FlightPayload* payload) { - if (stage_ == Stage::NEW) { - RETURN_NOT_OK(reader_->ReadNext(¤t_batch_)); - if (!current_batch_) { - // Signal that iteration is over - payload->ipc_message.metadata = nullptr; - return Status::OK(); + // If we have previous payloads (dictionary messages or previous record batches) + // return them first before reading next record batch. + if (!payload_list_.empty()) { + *payload = std::move(payload_list_.front()); + payload_list_.pop_front(); + return Status::OK(); + } + + std::shared_ptr<RecordBatch> batch; + RETURN_NOT_OK(reader_->ReadNext(&batch)); + + if (!batch) { + // End of stream + if (writer_) { + RETURN_NOT_OK(writer_->Close()); } - ARROW_ASSIGN_OR_RAISE(dictionaries_, - ipc::CollectDictionaries(*current_batch_, mapper_)); - stage_ = Stage::DICTIONARY; + payload->ipc_message.metadata = nullptr; + return Status::OK(); } - if (stage_ == Stage::DICTIONARY) { - if (dictionary_index_ == static_cast<int>(dictionaries_.size())) { - stage_ = Stage::RECORD_BATCH; - return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_, - &payload->ipc_message); - } else { - return GetNextDictionary(payload); + // Check if writer is already initialized or if schema has changed. + // To recreate the writer. Review Comment: The test is `TestFlightSqlServer.TestCommandGetTablesWithIncludedSchemas` and failed with: ```c++ /home/raulcd/code/arrow/cpp/src/arrow/flight/sql/example/sqlite_server.cc:395: GetTables include_schema=1 /home/raulcd/code/arrow/cpp/src/arrow/flight/sql/example/sqlite_server.cc:409: GetTables: SELECT 'main' as catalog_name, null as schema_name, name as table_name, type as table_type FROM sqlite_master where 1=1 and table_name LIKE 'int%' order by table_name Reading next payload from data stream: 1 /home/raulcd/code/arrow/cpp/src/arrow/flight/sql/server_test.cc:325: Failure Failed '_error_or_value62.status()' failed with Invalid: Tried to write record batch with different schema /home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:339 writer_->WriteRecordBatch(*batch) /home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:415 impl_->Next(&payload) /home/raulcd/code/arrow/cpp/src/arrow/flight/transport_server.cc:291 data_stream->Next(). gRPC client debug context: {"created":"@1753261907.620461787","description":"Error received from peer ipv4:127.0.0.1:40331","file":"/home/raulcd/code/arrow/cpp/build/grpc_ep-prefix/src/grpc_ep/src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Tried to write record batch with different schema\n/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:339 writer_->WriteRecordBatch(*batch)\n/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:415 impl_->Next(&payload)\n/home/raulcd/code/arrow/cpp/src/arrow/flight/transport_server.cc:291 data_stream->Next()","grpc_status":3}. Client context: OK /home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:219 stream_->Finish(Status::OK()) /home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:256 Next() /home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:116 ToRecordBatches(stop_token) ``` The schemas difference was `table_schema` from `binary` to `binary not null`: ```md ## Batch schema: catalog_name: string db_schema_name: string table_name: string not null table_type: string not null table_schema: binary ## Reader Schema: catalog_name: string db_schema_name: string table_name: string not null table_type: string not null table_schema: binary not null ``` I've found the issue and fixed here: https://github.com/apache/arrow/pull/47115/commits/d4305a8de40802db19bfaa27528bd94203219a56 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org