raulcd commented on code in PR #47115:
URL: https://github.com/apache/arrow/pull/47115#discussion_r2219263167


##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,139 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+    }
+
+    // Return the current payload (schema)
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+    return Status::UnknownError("No schema payload generated");
   }
 
   Status Next(FlightPayload* payload) {
-    if (stage_ == Stage::NEW) {
-      RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
-      if (!current_batch_) {
-        // Signal that iteration is over
-        payload->ipc_message.metadata = nullptr;
-        return Status::OK();
+    // If we have previous payloads (dictionary messages or previous record 
batches)
+    // return them first before reading next record batch.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+    if (!batch) {
+      // End of stream
+      if (writer_) {
+        RETURN_NOT_OK(writer_->Close());
       }
-      ARROW_ASSIGN_OR_RAISE(dictionaries_,
-                            ipc::CollectDictionaries(*current_batch_, 
mapper_));
-      stage_ = Stage::DICTIONARY;
+      payload->ipc_message.metadata = nullptr;
+      return Status::OK();
     }
 
-    if (stage_ == Stage::DICTIONARY) {
-      if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
-        stage_ = Stage::RECORD_BATCH;
-        return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
-                                          &payload->ipc_message);
-      } else {
-        return GetNextDictionary(payload);
+    // Check if writer is already initialized or if schema has changed.
+    // To recreate the writer.
+    // TODO: Investigate why schema changed is needed for a flight-sql 
specific test.

Review Comment:
   Is a change of schema with an already generated writer expected? This was 
happening on a flight-sql test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to