raulcd commented on code in PR #47115:
URL: https://github.com/apache/arrow/pull/47115#discussion_r2224847732
##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const
ServerCallContext& context,
class RecordBatchStream::RecordBatchStreamImpl {
public:
- // Stages of the stream when producing payloads
- enum class Stage {
- NEW, // The stream has been created, but Next has not been called
yet
- DICTIONARY, // Dictionaries have been collected, and are being sent
- RECORD_BATCH // Initial have been sent
- };
-
RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
const ipc::IpcWriteOptions& options)
- : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+ : reader_(reader), options_(options) {}
std::shared_ptr<Schema> schema() { return reader_->schema(); }
Status GetSchemaPayload(FlightPayload* payload) {
- return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
- &payload->ipc_message);
+ if (!writer_) {
+ // Create the IPC writer on first call
+ auto payload_writer =
+ std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+ auto writer_result = ipc::internal::OpenRecordBatchWriter(
+ std::move(payload_writer), reader_->schema(), options_);
+
+ if (!writer_result.ok()) {
+ return writer_result.status();
+ }
+ writer_ = std::move(writer_result).ValueOrDie();
+ }
+
+ // Return the current payload (schema)
+ if (!payload_list_.empty()) {
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
+ return Status::OK();
+ }
+ return Status::UnknownError("No schema payload generated");
}
Status Next(FlightPayload* payload) {
- if (stage_ == Stage::NEW) {
- RETURN_NOT_OK(reader_->ReadNext(¤t_batch_));
- if (!current_batch_) {
- // Signal that iteration is over
- payload->ipc_message.metadata = nullptr;
- return Status::OK();
+ // If we have previous payloads (dictionary messages or previous record
batches)
+ // return them first before reading next record batch.
+ if (!payload_list_.empty()) {
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
+ return Status::OK();
+ }
+
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+ if (!batch) {
+ // End of stream
+ if (writer_) {
+ RETURN_NOT_OK(writer_->Close());
}
- ARROW_ASSIGN_OR_RAISE(dictionaries_,
- ipc::CollectDictionaries(*current_batch_,
mapper_));
- stage_ = Stage::DICTIONARY;
+ payload->ipc_message.metadata = nullptr;
+ return Status::OK();
}
- if (stage_ == Stage::DICTIONARY) {
- if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
- stage_ = Stage::RECORD_BATCH;
- return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
- &payload->ipc_message);
- } else {
- return GetNextDictionary(payload);
+ // Check if writer is already initialized or if schema has changed.
+ // To recreate the writer.
+ if (!writer_ || !batch->schema()->Equals(*reader_->schema())) {
+ if (writer_) {
+ RETURN_NOT_OK(writer_->Close());
+ }
+
+ // Create new writer with the batch's schema
+ auto payload_writer =
+ std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+ auto writer_result = ipc::internal::OpenRecordBatchWriter(
+ std::move(payload_writer), batch->schema(), options_);
+
+ if (!writer_result.ok()) {
+ return writer_result.status();
+ }
+ writer_ = std::move(writer_result).ValueOrDie();
+ if (!payload_list_.empty()) {
+ // Drop Schema message if it was generated.
+ // We want new Dictionary or RecordBatch messages only.
+ payload_list_.pop_front();
}
}
- RETURN_NOT_OK(reader_->ReadNext(¤t_batch_));
+ // One WriteRecordBatch call might generate multiple payloads, so we
+ // need to collect them in a list.
+ RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
- // TODO(ARROW-10787): Delta dictionaries
- if (!current_batch_) {
- // Signal that iteration is over
- payload->ipc_message.metadata = nullptr;
+ // Return the first generated payload after WriteRecordBatch. There must be
+ // at least one payload generated.
+ if (!payload_list_.empty()) {
+ *payload = std::move(payload_list_.front());
+ payload_list_.pop_front();
return Status::OK();
- } else {
- return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
- &payload->ipc_message);
}
+
+ return Status::UnknownError("IPC writer didn't produce any payloads");
}
- Status Close() { return reader_->Close(); }
+ Status Close() {
+ if (writer_) {
+ RETURN_NOT_OK(writer_->Close());
+ }
+ return reader_->Close();
+ }
private:
- Status GetNextDictionary(FlightPayload* payload) {
- const auto& it = dictionaries_[dictionary_index_++];
- return ipc::GetDictionaryPayload(it.first, it.second, ipc_options_,
- &payload->ipc_message);
- }
+ // Simple payload writer that uses a list of generated payloads.
+ class ServerRecordBatchPayloadWriter : public
ipc::internal::IpcPayloadWriter {
+ public:
+ explicit ServerRecordBatchPayloadWriter(std::list<FlightPayload>*
payload_list)
+ : payload_list_(payload_list), first_payload_(true) {}
+
+ Status Start() override { return Status::OK(); }
+
+ Status WritePayload(const ipc::IpcPayload& ipc_payload) override {
+ FlightPayload payload;
+ payload.ipc_message = ipc_payload;
+
+ if (first_payload_) {
+ if (ipc_payload.type != ipc::MessageType::SCHEMA) {
+ return Status::Invalid("First IPC message should be schema");
+ }
+ first_payload_ = false;
+ }
Review Comment:
Probably not, the IPC writer should guarantee that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]