raulcd commented on code in PR #47115: URL: https://github.com/apache/arrow/pull/47115#discussion_r2219263167
########## cpp/src/arrow/flight/server.cc: ########## @@ -275,78 +277,139 @@ Status FlightServerBase::GetSchema(const ServerCallContext& context, class RecordBatchStream::RecordBatchStreamImpl { public: - // Stages of the stream when producing payloads - enum class Stage { - NEW, // The stream has been created, but Next has not been called yet - DICTIONARY, // Dictionaries have been collected, and are being sent - RECORD_BATCH // Initial have been sent - }; - RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader, const ipc::IpcWriteOptions& options) - : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {} + : reader_(reader), options_(options) {} std::shared_ptr<Schema> schema() { return reader_->schema(); } Status GetSchemaPayload(FlightPayload* payload) { - return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_, - &payload->ipc_message); + if (!writer_) { + // Create the IPC writer on first call + auto payload_writer = + std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_); + auto writer_result = ipc::internal::OpenRecordBatchWriter( + std::move(payload_writer), reader_->schema(), options_); + + if (!writer_result.ok()) { + return writer_result.status(); + } + writer_ = std::move(writer_result).ValueOrDie(); + } + + // Return the current payload (schema) + if (!payload_list_.empty()) { + *payload = std::move(payload_list_.front()); + payload_list_.pop_front(); + return Status::OK(); + } + return Status::UnknownError("No schema payload generated"); } Status Next(FlightPayload* payload) { - if (stage_ == Stage::NEW) { - RETURN_NOT_OK(reader_->ReadNext(¤t_batch_)); - if (!current_batch_) { - // Signal that iteration is over - payload->ipc_message.metadata = nullptr; - return Status::OK(); + // If we have previous payloads (dictionary messages or previous record batches) + // return them first before reading next record batch. + if (!payload_list_.empty()) { + *payload = std::move(payload_list_.front()); + payload_list_.pop_front(); + return Status::OK(); + } + + std::shared_ptr<RecordBatch> batch; + RETURN_NOT_OK(reader_->ReadNext(&batch)); + + if (!batch) { + // End of stream + if (writer_) { + RETURN_NOT_OK(writer_->Close()); } - ARROW_ASSIGN_OR_RAISE(dictionaries_, - ipc::CollectDictionaries(*current_batch_, mapper_)); - stage_ = Stage::DICTIONARY; + payload->ipc_message.metadata = nullptr; + return Status::OK(); } - if (stage_ == Stage::DICTIONARY) { - if (dictionary_index_ == static_cast<int>(dictionaries_.size())) { - stage_ = Stage::RECORD_BATCH; - return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_, - &payload->ipc_message); - } else { - return GetNextDictionary(payload); + // Check if writer is already initialized or if schema has changed. + // To recreate the writer. + // TODO: Investigate why schema changed is needed for a flight-sql specific test. Review Comment: Is a change of schema with an already generated writer expected? This was happening on a flight-sql test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org