Hi Brad,

We are just learning how to use gRPC and we are observing some hard to 
understand behaviors, let me give you the entire picture:

We are using gRPC C++ Async API approach on the server and Swift on the 
client.
Our goal is to read and stream a file in chunks to the Swift clients.

This is our server code (code is really simple):
-------------------------------------------------------------------------

#include "Streamer_grpc.h"

namespace Illuscio {

Streamer_GRPC::~Streamer_GRPC() {
    _server->Shutdown();
    _queue->Shutdown();
};

void Streamer_GRPC::Run( uint16_t port ) {
    std::string server_address = "0.0.0.0:" + std::to_string( port );

    ServerBuilder builder;

    builder.AddListeningPort( server_address, 
grpc::InsecureServerCredentials() );
    // Register "service_" as the instance through which we'll communicate 
with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService( &_service );
    // Get hold of the completion queue used for the asynchronous 
communication
    // with the gRPC runtime.
    _queue = builder.AddCompletionQueue();
    // Finally assemble the server.
    _server = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRPCs();
}


Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* 
service, ServerCompletionQueue* queue )
    : CallData { service, queue }, _responder { &_context } {
    _tag.id = MessageID::TMP_FILE;
    _tag.data = this;
    Proceed();
}

void Streamer_GRPC::TMPFileData::Proceed() {
    switch ( _status ) {
    case CallStatus::CREATE: {
        _status = CallStatus::OPEN_FILE;
        _service->RequestStreamFile( &_context, &_clientFileReq, 
&_responder, _cq, _cq, (void*) this );
        break;
    }

    case CallStatus::OPEN_FILE: {
        myfile.open(_clientFileReq.file_path(), std::ios::binary);
           
        if (myfile.is_open()) {
            _status = CallStatus::PROCESS;
            std::cout << "File was successfuly open..." << std::endl;
            static_cast<CallData*>(this)->Proceed();
        } else {
            // Handles file opening failure
            std::cout << "File open operation failed..." << std::endl;
            _responder.Finish(Status::CANCELLED, nullptr);
            return;
        }
   
        break;
    }

    case CallStatus::PROCESS: {
        new TMPFileData { _service, _cq };
   
        if (!myfile.eof()) {
            const int read_size = static_cast<int>(myfile.gcount());
            _fileChunk.set_chunk_data(buffer, read_size);
            _responder .Write(_fileChunk, this);
        } else {
            std::cout << "EOF reached..." << std::endl;
            _status = CallStatus::FINISH;
            _responder.Finish(Status::OK, this);
        }

        break;
    }  
   
    default: {
        delete this;
    }
    }
}

void Streamer_GRPC::HandleRPCs() {
    new TMPFileData { &_service, _queue.get() };
    void* tag;
    bool ok;

    while ( _queue->Next(&tag, &ok) ) {
       // GPR_ASSERT(_queue->Next(&tag, &ok));
       // GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}
};

This is our proto file:
--------------------------------
syntax = "proto3";
package Illuscio;

service PROTO_Streamer {
    rpc StreamFile(TMP_FileRequest) returns (stream TMP_FileChunk);
}

message TMP_FileRequest {
    string file_path = 1;
}

message TMP_FileChunk {
    bytes chunk_data = 1;
}

And lastly, this is our client code snippet:
-----------------------------------------------------------------

Below method calls the actual gRPC client method (`StreamFile` rpc). It 
actually just call next and wait for the server to write it and then the 
data is appended until there's no more data to append.

func getContentData(for sourceURL: URL) async throws -> Data {
        do {
            try connection.connect()
        } catch { throw MCError.serverConnectionError }

        // Make the gRPC call to fetch the file's data asynchronously
       
        Task {
            var receivedData = Data()
            var fileChunkIterator = (connection.client?.getAsset(with: 
sourceURL) as! 
GRPCAsyncResponseStream<Illuscio_TMP_FileChunk>).makeAsyncIterator()
            do {
                var fileChunk = try await fileChunkIterator.next()
                // Concurrently fetch and append chunks to receivedData
                while fileChunk != nil {
                    receivedData.append(fileChunk!.chunkData)
                    fileChunk = try await fileChunkIterator.next()
                }
               
                try? connection.disconnect()
            } catch (let error) {
                throw MCError.fileWriteToLocalFailed
            }
        }
            // Once the call completes, disconnect from the server
            return Data()
    }


*Recently we have being trying both approaches (async and callbacks), since 
we are streaming chunks of a file to the Swift clients (our server is C++).*
*Our observations are:*
*1.- The bytes rate received on the client when using callbacks are between 
25-40 Mb/s.*
*2.- The bytes rate received on the client  when using async APIs, are 
between 0-97 Kb/s (which is not good).*
*We honestly were expecting that the async APIs would behave even better 
which doesn't seem to be the case.*

*Any idea what we are missing?*
On Thursday, August 31, 2023 at 1:47:01 PM UTC-5 Pedro Alfonso wrote:

> Thanks Brad and yes it is correct, needed to instantiate it. Problem 
> solved...
>
> On Wednesday, August 30, 2023 at 4:32:30 PM UTC-5 Brad Town wrote:
>
>> I think the problem is that the Illuscio::FileChunk object is created on 
>> the stack, StartWrite is called with that chunk, and the object goes out 
>> of scope. (I can't be sure because I don't know how Illuscio::FileChunk is 
>> implemented, but I'm assuming it's writing the data to a buffer internal to 
>> Illuscio::FileChunk.) I believe the write operation is trying to access 
>> a buffer that's no longer available.
>>
>> Brad Town
>>
>> On Wed, Aug 23, 2023 at 3:55 PM Pedro Alfonso <pedroalfo...@gmail.com> 
>> wrote:
>>
>>> I'm getting below error ("Bus error"), when running the c++ server:
>>>
>>> dev-telemetry@telemetry build % ./file_streaming_server                 
>>>        
>>>
>>> Server listening on 0.0.0.0:8070
>>>
>>> Buffer contents: 67 76 68 46 73 76 76 85 83 67 73 79 2 0 0 0 104 69 0 0 
>>> 27 0 0 0 -35 -65 -93 2 0 0 0 0 -17 -63 88 71 0 0 0 0 16 8 0 0 0 0 0 0 0 0 0 
>>> 0 0 0 0 0 0 0 0 0 100 -68 75 12 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 
>>> 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 78 -65 109 76 53 0 0 -96 -123 19 0 0 
>>> 0 0 0 4 -97 5 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 -73 -31 -67 68 97 
>>> 126 62 -120 67 -6 62 88 -80 -63 62 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 0 0 
>>> 1 0 0 0 0 78 0 0 -40 83 0 0 -92 36 25 0 0 0 0 0 -56 -41 8 0 0 0 0 0 60 -37 
>>> 117 -66 -64 32 -107 57 -96 -73 -31 -67 -128 96 -120 59 -48 104 122 62 112 
>>> 66 9 62 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 3 0 0 0 2 0 0 0 0 78 0 0 25 35 0 0 
>>> 108 -4 33 0 0 0 0 0 -93 -77 3 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 
>>> -73 -31 -67 56 -104 -15 -67 97 -77 -6 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 
>>> 0 0 0 0 4 0 0 0 3 0 0 0 0 78 0 0 106 18 0 0 15 -80 37 0 0 0 0 0 46 -15 1 0 
>>> 0 0 0 0 60 -37 117 -66 -64 32 -107 57 0 81 73 -67 -84 83 55 -66 -127 72 123 
>>> 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 5 0 0 0 4 0 0 0 0 78 0 0 21 
>>> 27 0 0 61 -95 39 0 0 0 0 0 55 -37 2 0 0 0 0 0 60 -37 117 -66 -61 114 -4 60 
>>> -64 -125 -104 -68 116 -105 86 -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 
>>> 0 0 0 0 0 0 0 0 6 0 0 0 5 0 0 0 1 78 0 0 -55 0 0 0 116 124 42 0 0 0 0 0 51 
>>> 21 0 0 0 0 0 0 60 -37 117 -66 -15 -64 60 61 -64 -125 -104 -68 88 57 102 -66 
>>> -127 72 123 61 0 -91 91 -69 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 7 0 0 0 5 0 0 0 
>>> 0 78 0 0 83 15 0 0 -89 -111 42 0 0 0 0 0 -63 -99 1 0 0 0 0 0 60 -37 117 -66 
>>> -15 -64 60 61 0 -91 91 -69 88 57 102 -66 -127 72 123 61 0 53 67 60 0 0 0 0 
>>> 0 0 0 0 0 0 0 0 0 0 0 0 8 0 0 0 7 0 0 0 1 78 0 0 -91 2 0 0 104 47 44 0 0 0 
>>> 0 0 103 71 0 0 0 0 0 0 74 10 110 -66 -15 -64 60 61 -64 75 -116 59 88 57 102 
>>> -66 -71 4 92 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 9 0 0 0 7 0 0 0 
>>> 1 78 0 0 -103 9 0 0 -49 118 44 0 0 0 0 0 35 3 1 0 0 0 0 0 74 10 110 -66 -71 
>>> 4 92 61 0 -91 91 -69 88 57 102 -66 -127 72 123 61 -64 75 -116 59 0 0 0 0 0 
>>> 0 0 0 0 0 0 0 0 0 0 0 10 0 0 0 7 0 0 0 1 78 0 0 59 16 0 0 -14 121 45 0 0 0 
>>> 0 0 57 -74 1 0 0 0 0 0 74 10 110 -66 -71 4 92 61 -64 75 -116 59 88 57 102 
>>> -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 11 0 0 0 5 0 
>>> 0 0 0 78 0 0 52 26 0 0 43 48 47 0 0 0 0 0 124 -61 2 0 0 0 0 0 88 57 102 -66 
>>> -61 114 -4 60 0 -91 91 -69 116 -105 86 -66 -15 -64 60 61 0 53 67 60 0 0 0 0 
>>> 0 0 0 0 0 0 0 0 0 0 0 0 12 0 0 0 11 0 0 0 1 78 0 0 6 0 0 0 -89 -13 49 0 0 0 
>>> 0 0 -94 0 0 0 0 0 0 0 88 57 102 -66 -61 114 -4 60 -64 75 -116 59 102 104 94 
>>> -66 41 125 29 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 13 0 0 0 11 0 0 
>>> 0 1 78 0 0 30 0 0 0 73 -12 49 0 0 0 0 0 42 3 0 0 0 0 0 0 88 57 102 -66 41 
>>> 125 29 61 0 -91 91 -69 102 104 94 -66 -15 -64 60 61 -64 75 -116 59 0 0 0 0 
>>> 0 0 0 0 
>>>
>>> Buffer size: 1024 bytes
>>>
>>> Read size: 1024 bytes
>>>
>>> zsh: bus error  ./file_streaming_server
>>>
>>> dev-telemetry@telemetry build % 
>>>
>>> Above I'm printing the buffer's content and size I'm trying to stream to 
>>> the Swift client.
>>>
>>> And here's the server code:
>>>
>>> #include <iostream>
>>> #include <memory>
>>> #include <string>
>>> #include <fstream>
>>> #include <dirent.h>
>>> #include <algorithm>
>>>
>>> #include <grpc/grpc.h>
>>> #include <grpcpp/security/server_credentials.h>
>>> #include <grpcpp/ext/proto_server_reflection_plugin.h>
>>> #include <grpcpp/server.h>
>>> #include <grpcpp/server_builder.h>
>>> #include <grpcpp/server_context.h>
>>> #include "file_streaming.grpc.pb.h"
>>>
>>> using grpc::CallbackServerContext;
>>> using grpc::Server;
>>> using grpc::ServerUnaryReactor;
>>> using grpc::ServerBuilder;
>>> using grpc::ServerContext;
>>> using grpc::ServerReader;
>>> using grpc::ServerReaderWriter;
>>> using grpc::ServerWriter;
>>> using grpc::Status;
>>>
>>> class FileStreamingImpl final : public 
>>> Illuscio::FileStreaming::CallbackService {
>>>  public:
>>>    grpc::ServerWriteReactor<Illuscio::FileChunk>* 
>>> GetFile(grpc::CallbackServerContext* context, const Illuscio::FileRequest* 
>>> request) override {
>>>      class FileSplitter : public 
>>> grpc::ServerWriteReactor<Illuscio::FileChunk> {
>>>      private:
>>>              const int chunk_size = 1024;
>>>              char buffer[1024];
>>>              std::ifstream file;
>>>        //
>>>      public:
>>>    FileSplitter(const std::string& file_path) : file(file_path, 
>>> std::ios::binary) {
>>>        try {
>>>            if (!file) {
>>>                Finish(grpc::Status(grpc::StatusCode::INTERNAL, "Error 
>>> opening file"));
>>>                return;
>>>            }
>>>
>>>            NextWrite();
>>>        } catch (const std::exception& ex) {
>>>            Finish(grpc::Status(grpc::StatusCode::INTERNAL, ex.what()));
>>>        }
>>>    }
>>>    //
>>>     void OnDone() override { delete this; }
>>>     void OnWriteDone(bool /*ok*/) override {
>>>       std::cout << "Another write..." << std::endl;
>>>       NextWrite();
>>>     }
>>>
>>>           private:
>>>             void NextWrite() {
>>>              file.read(buffer, chunk_size);
>>>              const int read_size = static_cast<int>(file.gcount());
>>>
>>>              if (read_size > 0) {
>>>                  Illuscio::FileChunk chunk;
>>>                  chunk.set_chunk_data(buffer, read_size);
>>>                  std::cout << "Buffer contents: ";
>>>                  //
>>>                  for (int i = 0; i < read_size; ++i) {
>>>                    std::cout << static_cast<int>(buffer[i]) << " ";
>>>                  }
>>>                  std::cout << std::endl;
>>>                  std::cout << "Buffer size: " << sizeof(buffer) << " 
>>> bytes" << std::endl;
>>>                  std::cout << "Read size: " << read_size << " bytes" << 
>>> std::endl;
>>> //
>>>                  StartWrite(&chunk);
>>>              } else {
>>>                  // All data has been sent, finish the stream
>>>                  Finish(grpc::Status::OK);
>>>              }
>>>          }
>>>          };
>>>
>>>          return new FileSplitter(request->file_path());
>>>       }
>>>
>>> .......
>>>
>>> I have tried with several buffer sizes from 1024 bytes to 1MB, but still 
>>> getting the "Bus error" message.
>>>
>>> Any thoughts?
>>>
>>>
>>> Thanks in advance.
>>>
>>> -- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "grpc.io" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to grpc-io+u...@googlegroups.com.
>>> To view this discussion on the web visit 
>>> https://groups.google.com/d/msgid/grpc-io/5edbaedb-681d-44a3-ba1f-8e8d0c896b69n%40googlegroups.com
>>>  
>>> <https://groups.google.com/d/msgid/grpc-io/5edbaedb-681d-44a3-ba1f-8e8d0c896b69n%40googlegroups.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/380f385f-e798-49b5-98d8-c08998b24a8an%40googlegroups.com.

Reply via email to