lidavidm commented on code in PR #47115:
URL: https://github.com/apache/arrow/pull/47115#discussion_r2224098026


##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+    }
+
+    // Return the current payload (schema)
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+    return Status::UnknownError("No schema payload generated");
   }
 
   Status Next(FlightPayload* payload) {
-    if (stage_ == Stage::NEW) {
-      RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
-      if (!current_batch_) {
-        // Signal that iteration is over
-        payload->ipc_message.metadata = nullptr;
-        return Status::OK();
+    // If we have previous payloads (dictionary messages or previous record 
batches)
+    // return them first before reading next record batch.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+    if (!batch) {
+      // End of stream
+      if (writer_) {
+        RETURN_NOT_OK(writer_->Close());
       }
-      ARROW_ASSIGN_OR_RAISE(dictionaries_,
-                            ipc::CollectDictionaries(*current_batch_, 
mapper_));
-      stage_ = Stage::DICTIONARY;
+      payload->ipc_message.metadata = nullptr;
+      return Status::OK();
     }
 
-    if (stage_ == Stage::DICTIONARY) {
-      if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
-        stage_ = Stage::RECORD_BATCH;
-        return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
-                                          &payload->ipc_message);
-      } else {
-        return GetNextDictionary(payload);
+    // Check if writer is already initialized or if schema has changed.
+    // To recreate the writer.
+    if (!writer_ || !batch->schema()->Equals(*reader_->schema())) {
+      if (writer_) {
+        RETURN_NOT_OK(writer_->Close());
+      }
+
+      // Create new writer with the batch's schema
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), batch->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+      if (!payload_list_.empty()) {
+        // Drop Schema message if it was generated.
+        // We want new Dictionary or RecordBatch messages only.
+        payload_list_.pop_front();
       }
     }
 
-    RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
+    // One WriteRecordBatch call might generate multiple payloads, so we
+    // need to collect them in a list.
+    RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
 
-    // TODO(ARROW-10787): Delta dictionaries
-    if (!current_batch_) {
-      // Signal that iteration is over
-      payload->ipc_message.metadata = nullptr;
+    // Return the first generated payload after WriteRecordBatch. There must be
+    // at least one payload generated.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
       return Status::OK();
-    } else {
-      return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
-                                        &payload->ipc_message);
     }
+
+    return Status::UnknownError("IPC writer didn't produce any payloads");
   }
 
-  Status Close() { return reader_->Close(); }
+  Status Close() {
+    if (writer_) {
+      RETURN_NOT_OK(writer_->Close());
+    }
+    return reader_->Close();
+  }
 
  private:
-  Status GetNextDictionary(FlightPayload* payload) {
-    const auto& it = dictionaries_[dictionary_index_++];
-    return ipc::GetDictionaryPayload(it.first, it.second, ipc_options_,
-                                     &payload->ipc_message);
-  }
+  // Simple payload writer that uses a list of generated payloads.
+  class ServerRecordBatchPayloadWriter : public 
ipc::internal::IpcPayloadWriter {
+   public:
+    explicit ServerRecordBatchPayloadWriter(std::list<FlightPayload>* 
payload_list)
+        : payload_list_(payload_list), first_payload_(true) {}
+
+    Status Start() override { return Status::OK(); }
+
+    Status WritePayload(const ipc::IpcPayload& ipc_payload) override {
+      FlightPayload payload;
+      payload.ipc_message = ipc_payload;
+
+      if (first_payload_) {
+        if (ipc_payload.type != ipc::MessageType::SCHEMA) {
+          return Status::Invalid("First IPC message should be schema");
+        }
+        first_payload_ = false;
+      }

Review Comment:
   Do we really need to track first_payload_ and this, or just trust that the 
IPC writer does its job?



##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+    }
+
+    // Return the current payload (schema)
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+    return Status::UnknownError("No schema payload generated");
   }
 
   Status Next(FlightPayload* payload) {
-    if (stage_ == Stage::NEW) {
-      RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
-      if (!current_batch_) {
-        // Signal that iteration is over
-        payload->ipc_message.metadata = nullptr;
-        return Status::OK();
+    // If we have previous payloads (dictionary messages or previous record 
batches)
+    // return them first before reading next record batch.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }

Review Comment:
   You could do something like
   
   ```
   if (payload_list_.empty()) {
     // Invoke the writer
   }
   
   if (payload_list_.empty()) {
     // Error
   }
   
   // Return first payload



##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,139 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+    }
+
+    // Return the current payload (schema)
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+    return Status::UnknownError("No schema payload generated");
   }
 
   Status Next(FlightPayload* payload) {
-    if (stage_ == Stage::NEW) {
-      RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
-      if (!current_batch_) {
-        // Signal that iteration is over
-        payload->ipc_message.metadata = nullptr;
-        return Status::OK();
+    // If we have previous payloads (dictionary messages or previous record 
batches)
+    // return them first before reading next record batch.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+    if (!batch) {
+      // End of stream
+      if (writer_) {
+        RETURN_NOT_OK(writer_->Close());
       }
-      ARROW_ASSIGN_OR_RAISE(dictionaries_,
-                            ipc::CollectDictionaries(*current_batch_, 
mapper_));
-      stage_ = Stage::DICTIONARY;
+      payload->ipc_message.metadata = nullptr;
+      return Status::OK();
     }
 
-    if (stage_ == Stage::DICTIONARY) {
-      if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
-        stage_ = Stage::RECORD_BATCH;
-        return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
-                                          &payload->ipc_message);
-      } else {
-        return GetNextDictionary(payload);
+    // Check if writer is already initialized or if schema has changed.
+    // To recreate the writer.
+    // TODO: Investigate why schema changed is needed for a flight-sql 
specific test.

Review Comment:
   Which test? I don't think this is expected/they probably are buggy, in that 
we never validated things before



##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();

Review Comment:
   Why not ARROW_ASSIGN_OR_RAISE?



##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+    }
+
+    // Return the current payload (schema)
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+    return Status::UnknownError("No schema payload generated");
   }
 
   Status Next(FlightPayload* payload) {
-    if (stage_ == Stage::NEW) {
-      RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
-      if (!current_batch_) {
-        // Signal that iteration is over
-        payload->ipc_message.metadata = nullptr;
-        return Status::OK();
+    // If we have previous payloads (dictionary messages or previous record 
batches)
+    // return them first before reading next record batch.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+    if (!batch) {
+      // End of stream
+      if (writer_) {
+        RETURN_NOT_OK(writer_->Close());
       }
-      ARROW_ASSIGN_OR_RAISE(dictionaries_,
-                            ipc::CollectDictionaries(*current_batch_, 
mapper_));
-      stage_ = Stage::DICTIONARY;
+      payload->ipc_message.metadata = nullptr;
+      return Status::OK();
     }
 
-    if (stage_ == Stage::DICTIONARY) {
-      if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
-        stage_ = Stage::RECORD_BATCH;
-        return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
-                                          &payload->ipc_message);
-      } else {
-        return GetNextDictionary(payload);
+    // Check if writer is already initialized or if schema has changed.
+    // To recreate the writer.

Review Comment:
   I'm not actually sure what that test is about...we don't support changing 
schemas on the same stream (this was discussed on the ML a while back and 
discarded for lack of anyone to push it forward with all the bikeshedding)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to