lidavidm commented on code in PR #47115:
URL: https://github.com/apache/arrow/pull/47115#discussion_r2227169956
##########
cpp/src/arrow/flight/sql/example/sqlite_tables_schema_batch_reader.cc:
##########
Review Comment:
Thanks for tracking this down!
##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,99 @@ Status FlightServerBase::GetSchema(const
ServerCallContext& context,
class RecordBatchStream::RecordBatchStreamImpl {
public:
- // Stages of the stream when producing payloads
- enum class Stage {
- NEW, // The stream has been created, but Next has not been called
yet
- DICTIONARY, // Dictionaries have been collected, and are being sent
- RECORD_BATCH // Initial have been sent
- };
-
RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
const ipc::IpcWriteOptions& options)
- : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+ : reader_(reader), options_(options) {}
std::shared_ptr<Schema> schema() { return reader_->schema(); }
Status GetSchemaPayload(FlightPayload* payload) {
- return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
- &payload->ipc_message);
+ if (!writer_) {
+ // Create the IPC writer on first call
+ auto payload_writer =
+ std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+ ARROW_ASSIGN_OR_RAISE(
+ writer_,
ipc::internal::OpenRecordBatchWriter(std::move(payload_writer),
+ reader_->schema(),
options_));
+ }
+
+ // Return the expected schema payload.
+ if (payload_list_.empty()) {
+ return Status::UnknownError("No schema payload generated");
+ }
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
+ return Status::OK();
}
Status Next(FlightPayload* payload) {
- if (stage_ == Stage::NEW) {
- RETURN_NOT_OK(reader_->ReadNext(¤t_batch_));
- if (!current_batch_) {
- // Signal that iteration is over
+ // If we have previous payloads (dictionary messages or previous record
batches)
+ // we will return them before reading the next record batch.
+ if (payload_list_.empty()) {
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(reader_->ReadNext(&batch));
+ if (!batch) {
+ // End of stream
+ if (writer_) {
+ RETURN_NOT_OK(writer_->Close());
+ }
payload->ipc_message.metadata = nullptr;
return Status::OK();
}
- ARROW_ASSIGN_OR_RAISE(dictionaries_,
- ipc::CollectDictionaries(*current_batch_,
mapper_));
- stage_ = Stage::DICTIONARY;
+ if (!writer_) {
+ return Status::UnknownError(
+ "Writer should be initialized before reading Next batches");
+ }
+ // One WriteRecordBatch call might generate multiple payloads, so we
+ // need to collect them in a list.
+ RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
}
- if (stage_ == Stage::DICTIONARY) {
- if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
- stage_ = Stage::RECORD_BATCH;
- return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
- &payload->ipc_message);
- } else {
- return GetNextDictionary(payload);
- }
+ // There must be at least one payload generated after WriteRecordBatch or
+ // from previous calls to WriteRecordBatch.
+ if (payload_list_.empty()) {
+ return Status::UnknownError("IPC writer didn't produce any payloads");
}
- RETURN_NOT_OK(reader_->ReadNext(¤t_batch_));
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
+ return Status::OK();
+ }
- // TODO(ARROW-10787): Delta dictionaries
- if (!current_batch_) {
- // Signal that iteration is over
- payload->ipc_message.metadata = nullptr;
- return Status::OK();
- } else {
- return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
- &payload->ipc_message);
+ Status Close() {
+ if (writer_) {
+ RETURN_NOT_OK(writer_->Close());
}
+ return reader_->Close();
}
- Status Close() { return reader_->Close(); }
-
private:
- Status GetNextDictionary(FlightPayload* payload) {
- const auto& it = dictionaries_[dictionary_index_++];
- return ipc::GetDictionaryPayload(it.first, it.second, ipc_options_,
- &payload->ipc_message);
- }
+ // Simple payload writer that uses a list of generated payloads.
+ class ServerRecordBatchPayloadWriter : public
ipc::internal::IpcPayloadWriter {
+ public:
+ explicit ServerRecordBatchPayloadWriter(std::list<FlightPayload>*
payload_list)
+ : payload_list_(payload_list) {}
- Stage stage_ = Stage::NEW;
- std::shared_ptr<RecordBatchReader> reader_;
- ipc::DictionaryFieldMapper mapper_;
- ipc::IpcWriteOptions ipc_options_;
- std::shared_ptr<RecordBatch> current_batch_;
- std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;
+ Status Start() override { return Status::OK(); }
- // Index of next dictionary to send
- int dictionary_index_ = 0;
+ Status WritePayload(const ipc::IpcPayload& ipc_payload) override {
+ FlightPayload payload;
+ payload.ipc_message = ipc_payload;
+
+ payload_list_->push_back(std::move(payload));
+ return Status::OK();
+ }
+
+ Status Close() override { return Status::OK(); }
+
+ private:
+ std::list<FlightPayload>* payload_list_;
+ };
+
+ std::shared_ptr<RecordBatchReader> reader_;
+ ipc::IpcWriteOptions options_;
+ std::unique_ptr<ipc::RecordBatchWriter> writer_;
+ std::list<FlightPayload> payload_list_;
Review Comment:
Ah sorry one last thing: I would prefer `std::deque` (deque) for a
double-ended queue over `std::list` (linked list)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]