raulcd commented on code in PR #47115:
URL: https://github.com/apache/arrow/pull/47115#discussion_r2225045071


##########
cpp/src/arrow/flight/server.cc:
##########
@@ -275,78 +277,137 @@ Status FlightServerBase::GetSchema(const 
ServerCallContext& context,
 
 class RecordBatchStream::RecordBatchStreamImpl {
  public:
-  // Stages of the stream when producing payloads
-  enum class Stage {
-    NEW,          // The stream has been created, but Next has not been called 
yet
-    DICTIONARY,   // Dictionaries have been collected, and are being sent
-    RECORD_BATCH  // Initial have been sent
-  };
-
   RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
                         const ipc::IpcWriteOptions& options)
-      : reader_(reader), mapper_(*reader_->schema()), ipc_options_(options) {}
+      : reader_(reader), options_(options) {}
 
   std::shared_ptr<Schema> schema() { return reader_->schema(); }
 
   Status GetSchemaPayload(FlightPayload* payload) {
-    return ipc::GetSchemaPayload(*reader_->schema(), ipc_options_, mapper_,
-                                 &payload->ipc_message);
+    if (!writer_) {
+      // Create the IPC writer on first call
+      auto payload_writer =
+          std::make_unique<ServerRecordBatchPayloadWriter>(&payload_list_);
+      auto writer_result = ipc::internal::OpenRecordBatchWriter(
+          std::move(payload_writer), reader_->schema(), options_);
+
+      if (!writer_result.ok()) {
+        return writer_result.status();
+      }
+      writer_ = std::move(writer_result).ValueOrDie();
+    }
+
+    // Return the current payload (schema)
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+    return Status::UnknownError("No schema payload generated");
   }
 
   Status Next(FlightPayload* payload) {
-    if (stage_ == Stage::NEW) {
-      RETURN_NOT_OK(reader_->ReadNext(&current_batch_));
-      if (!current_batch_) {
-        // Signal that iteration is over
-        payload->ipc_message.metadata = nullptr;
-        return Status::OK();
+    // If we have previous payloads (dictionary messages or previous record 
batches)
+    // return them first before reading next record batch.
+    if (!payload_list_.empty()) {
+      *payload = std::move(payload_list_.front());
+      payload_list_.pop_front();
+      return Status::OK();
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    RETURN_NOT_OK(reader_->ReadNext(&batch));
+
+    if (!batch) {
+      // End of stream
+      if (writer_) {
+        RETURN_NOT_OK(writer_->Close());
       }
-      ARROW_ASSIGN_OR_RAISE(dictionaries_,
-                            ipc::CollectDictionaries(*current_batch_, 
mapper_));
-      stage_ = Stage::DICTIONARY;
+      payload->ipc_message.metadata = nullptr;
+      return Status::OK();
     }
 
-    if (stage_ == Stage::DICTIONARY) {
-      if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
-        stage_ = Stage::RECORD_BATCH;
-        return ipc::GetRecordBatchPayload(*current_batch_, ipc_options_,
-                                          &payload->ipc_message);
-      } else {
-        return GetNextDictionary(payload);
+    // Check if writer is already initialized or if schema has changed.
+    // To recreate the writer.

Review Comment:
   The test is `TestFlightSqlServer.TestCommandGetTablesWithIncludedSchemas` 
and failed with:
   ```c++
   
/home/raulcd/code/arrow/cpp/src/arrow/flight/sql/example/sqlite_server.cc:395: 
GetTables include_schema=1
   
/home/raulcd/code/arrow/cpp/src/arrow/flight/sql/example/sqlite_server.cc:409: 
GetTables: SELECT 'main' as catalog_name, null as schema_name, name as 
table_name, type as table_type FROM sqlite_master where 1=1 and table_name LIKE 
'int%' order by table_name
   Reading next payload from data stream: 1
   /home/raulcd/code/arrow/cpp/src/arrow/flight/sql/server_test.cc:325: Failure
   Failed
   '_error_or_value62.status()' failed with Invalid: Tried to write record 
batch with different schema
   /home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:339  
writer_->WriteRecordBatch(*batch)
   /home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:415  
impl_->Next(&payload)
   /home/raulcd/code/arrow/cpp/src/arrow/flight/transport_server.cc:291  
data_stream->Next(). gRPC client debug context: 
{"created":"@1753261907.620461787","description":"Error received from peer 
ipv4:127.0.0.1:40331","file":"/home/raulcd/code/arrow/cpp/build/grpc_ep-prefix/src/grpc_ep/src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Tried
 to write record batch with different 
schema\n/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:339  
writer_->WriteRecordBatch(*batch)\n/home/raulcd/code/arrow/cpp/src/arrow/flight/server.cc:415
  
impl_->Next(&payload)\n/home/raulcd/code/arrow/cpp/src/arrow/flight/transport_server.cc:291
  data_stream->Next()","grpc_status":3}. Client context: OK
   /home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:219  
stream_->Finish(Status::OK())
   /home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:256  Next()
   /home/raulcd/code/arrow/cpp/src/arrow/flight/client.cc:116  
ToRecordBatches(stop_token)
   ```
   The schemas difference was `table_schema` from `binary` to `binary not null`:
   ```md
   ## Batch schema:
   catalog_name: string
   db_schema_name: string
   table_name: string not null
   table_type: string not null
   table_schema: binary
   ## Reader Schema:
   catalog_name: string
   db_schema_name: string
   table_name: string not null
   table_type: string not null
   table_schema: binary not null
   ```
   I've found the issue and fixed here: 
https://github.com/apache/arrow/pull/47115/commits/d4305a8de40802db19bfaa27528bd94203219a56
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to