Hi Brad, We are just learning how to use gRPC and we are observing some hard to understand behaviors, let me give you the entire picture:
We are using gRPC C++ Async API approach on the server and Swift on the client. Our goal is to read and stream a file in chunks to the Swift clients. This is our server code (code is really simple): ------------------------------------------------------------------------- #include "Streamer_grpc.h" namespace Illuscio { Streamer_GRPC::~Streamer_GRPC() { _server->Shutdown(); _queue->Shutdown(); }; void Streamer_GRPC::Run( uint16_t port ) { std::string server_address = "0.0.0.0:" + std::to_string( port ); ServerBuilder builder; builder.AddListeningPort( server_address, grpc::InsecureServerCredentials() ); // Register "service_" as the instance through which we'll communicate with // clients. In this case it corresponds to an *asynchronous* service. builder.RegisterService( &_service ); // Get hold of the completion queue used for the asynchronous communication // with the gRPC runtime. _queue = builder.AddCompletionQueue(); // Finally assemble the server. _server = builder.BuildAndStart(); std::cout << "Server listening on " << server_address << std::endl; // Proceed to the server's main loop. HandleRPCs(); } Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* service, ServerCompletionQueue* queue ) : CallData { service, queue }, _responder { &_context } { _tag.id = MessageID::TMP_FILE; _tag.data = this; Proceed(); } void Streamer_GRPC::TMPFileData::Proceed() { switch ( _status ) { case CallStatus::CREATE: { _status = CallStatus::OPEN_FILE; _service->RequestStreamFile( &_context, &_clientFileReq, &_responder, _cq, _cq, (void*) this ); break; } case CallStatus::OPEN_FILE: { myfile.open(_clientFileReq.file_path(), std::ios::binary); if (myfile.is_open()) { _status = CallStatus::PROCESS; std::cout << "File was successfuly open..." << std::endl; static_cast<CallData*>(this)->Proceed(); } else { // Handles file opening failure std::cout << "File open operation failed..." << std::endl; _responder.Finish(Status::CANCELLED, nullptr); return; } break; } case CallStatus::PROCESS: { new TMPFileData { _service, _cq }; if (!myfile.eof()) { const int read_size = static_cast<int>(myfile.gcount()); _fileChunk.set_chunk_data(buffer, read_size); _responder .Write(_fileChunk, this); } else { std::cout << "EOF reached..." << std::endl; _status = CallStatus::FINISH; _responder.Finish(Status::OK, this); } break; } default: { delete this; } } } void Streamer_GRPC::HandleRPCs() { new TMPFileData { &_service, _queue.get() }; void* tag; bool ok; while ( _queue->Next(&tag, &ok) ) { // GPR_ASSERT(_queue->Next(&tag, &ok)); // GPR_ASSERT(ok); static_cast<CallData*>(tag)->Proceed(); } } }; This is our proto file: -------------------------------- syntax = "proto3"; package Illuscio; service PROTO_Streamer { rpc StreamFile(TMP_FileRequest) returns (stream TMP_FileChunk); } message TMP_FileRequest { string file_path = 1; } message TMP_FileChunk { bytes chunk_data = 1; } And lastly, this is our client code snippet: ----------------------------------------------------------------- Below method calls the actual gRPC client method (`StreamFile` rpc). It actually just call next and wait for the server to write it and then the data is appended until there's no more data to append. func getContentData(for sourceURL: URL) async throws -> Data { do { try connection.connect() } catch { throw MCError.serverConnectionError } // Make the gRPC call to fetch the file's data asynchronously Task { var receivedData = Data() var fileChunkIterator = (connection.client?.getAsset(with: sourceURL) as! GRPCAsyncResponseStream<Illuscio_TMP_FileChunk>).makeAsyncIterator() do { var fileChunk = try await fileChunkIterator.next() // Concurrently fetch and append chunks to receivedData while fileChunk != nil { receivedData.append(fileChunk!.chunkData) fileChunk = try await fileChunkIterator.next() } try? connection.disconnect() } catch (let error) { throw MCError.fileWriteToLocalFailed } } // Once the call completes, disconnect from the server return Data() } *Recently we have being trying both approaches (async and callbacks), since we are streaming chunks of a file to the Swift clients (our server is C++).* *Our observations are:* *1.- The bytes rate received on the client when using callbacks are between 25-40 Mb/s.* *2.- The bytes rate received on the client when using async APIs, are between 0-97 Kb/s (which is not good).* *We honestly were expecting that the async APIs would behave even better which doesn't seem to be the case.* *Any idea what we are missing?* On Thursday, August 31, 2023 at 1:47:01 PM UTC-5 Pedro Alfonso wrote: > Thanks Brad and yes it is correct, needed to instantiate it. Problem > solved... > > On Wednesday, August 30, 2023 at 4:32:30 PM UTC-5 Brad Town wrote: > >> I think the problem is that the Illuscio::FileChunk object is created on >> the stack, StartWrite is called with that chunk, and the object goes out >> of scope. (I can't be sure because I don't know how Illuscio::FileChunk is >> implemented, but I'm assuming it's writing the data to a buffer internal to >> Illuscio::FileChunk.) I believe the write operation is trying to access >> a buffer that's no longer available. >> >> Brad Town >> >> On Wed, Aug 23, 2023 at 3:55 PM Pedro Alfonso <pedroalfo...@gmail.com> >> wrote: >> >>> I'm getting below error ("Bus error"), when running the c++ server: >>> >>> dev-telemetry@telemetry build % ./file_streaming_server >>> >>> >>> Server listening on 0.0.0.0:8070 >>> >>> Buffer contents: 67 76 68 46 73 76 76 85 83 67 73 79 2 0 0 0 104 69 0 0 >>> 27 0 0 0 -35 -65 -93 2 0 0 0 0 -17 -63 88 71 0 0 0 0 16 8 0 0 0 0 0 0 0 0 0 >>> 0 0 0 0 0 0 0 0 0 100 -68 75 12 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 >>> 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 78 -65 109 76 53 0 0 -96 -123 19 0 0 >>> 0 0 0 4 -97 5 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 -73 -31 -67 68 97 >>> 126 62 -120 67 -6 62 88 -80 -63 62 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 0 0 >>> 1 0 0 0 0 78 0 0 -40 83 0 0 -92 36 25 0 0 0 0 0 -56 -41 8 0 0 0 0 0 60 -37 >>> 117 -66 -64 32 -107 57 -96 -73 -31 -67 -128 96 -120 59 -48 104 122 62 112 >>> 66 9 62 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 3 0 0 0 2 0 0 0 0 78 0 0 25 35 0 0 >>> 108 -4 33 0 0 0 0 0 -93 -77 3 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 >>> -73 -31 -67 56 -104 -15 -67 97 -77 -6 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 >>> 0 0 0 0 4 0 0 0 3 0 0 0 0 78 0 0 106 18 0 0 15 -80 37 0 0 0 0 0 46 -15 1 0 >>> 0 0 0 0 60 -37 117 -66 -64 32 -107 57 0 81 73 -67 -84 83 55 -66 -127 72 123 >>> 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 5 0 0 0 4 0 0 0 0 78 0 0 21 >>> 27 0 0 61 -95 39 0 0 0 0 0 55 -37 2 0 0 0 0 0 60 -37 117 -66 -61 114 -4 60 >>> -64 -125 -104 -68 116 -105 86 -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 >>> 0 0 0 0 0 0 0 0 6 0 0 0 5 0 0 0 1 78 0 0 -55 0 0 0 116 124 42 0 0 0 0 0 51 >>> 21 0 0 0 0 0 0 60 -37 117 -66 -15 -64 60 61 -64 -125 -104 -68 88 57 102 -66 >>> -127 72 123 61 0 -91 91 -69 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 7 0 0 0 5 0 0 0 >>> 0 78 0 0 83 15 0 0 -89 -111 42 0 0 0 0 0 -63 -99 1 0 0 0 0 0 60 -37 117 -66 >>> -15 -64 60 61 0 -91 91 -69 88 57 102 -66 -127 72 123 61 0 53 67 60 0 0 0 0 >>> 0 0 0 0 0 0 0 0 0 0 0 0 8 0 0 0 7 0 0 0 1 78 0 0 -91 2 0 0 104 47 44 0 0 0 >>> 0 0 103 71 0 0 0 0 0 0 74 10 110 -66 -15 -64 60 61 -64 75 -116 59 88 57 102 >>> -66 -71 4 92 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 9 0 0 0 7 0 0 0 >>> 1 78 0 0 -103 9 0 0 -49 118 44 0 0 0 0 0 35 3 1 0 0 0 0 0 74 10 110 -66 -71 >>> 4 92 61 0 -91 91 -69 88 57 102 -66 -127 72 123 61 -64 75 -116 59 0 0 0 0 0 >>> 0 0 0 0 0 0 0 0 0 0 0 10 0 0 0 7 0 0 0 1 78 0 0 59 16 0 0 -14 121 45 0 0 0 >>> 0 0 57 -74 1 0 0 0 0 0 74 10 110 -66 -71 4 92 61 -64 75 -116 59 88 57 102 >>> -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 11 0 0 0 5 0 >>> 0 0 0 78 0 0 52 26 0 0 43 48 47 0 0 0 0 0 124 -61 2 0 0 0 0 0 88 57 102 -66 >>> -61 114 -4 60 0 -91 91 -69 116 -105 86 -66 -15 -64 60 61 0 53 67 60 0 0 0 0 >>> 0 0 0 0 0 0 0 0 0 0 0 0 12 0 0 0 11 0 0 0 1 78 0 0 6 0 0 0 -89 -13 49 0 0 0 >>> 0 0 -94 0 0 0 0 0 0 0 88 57 102 -66 -61 114 -4 60 -64 75 -116 59 102 104 94 >>> -66 41 125 29 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 13 0 0 0 11 0 0 >>> 0 1 78 0 0 30 0 0 0 73 -12 49 0 0 0 0 0 42 3 0 0 0 0 0 0 88 57 102 -66 41 >>> 125 29 61 0 -91 91 -69 102 104 94 -66 -15 -64 60 61 -64 75 -116 59 0 0 0 0 >>> 0 0 0 0 >>> >>> Buffer size: 1024 bytes >>> >>> Read size: 1024 bytes >>> >>> zsh: bus error ./file_streaming_server >>> >>> dev-telemetry@telemetry build % >>> >>> Above I'm printing the buffer's content and size I'm trying to stream to >>> the Swift client. >>> >>> And here's the server code: >>> >>> #include <iostream> >>> #include <memory> >>> #include <string> >>> #include <fstream> >>> #include <dirent.h> >>> #include <algorithm> >>> >>> #include <grpc/grpc.h> >>> #include <grpcpp/security/server_credentials.h> >>> #include <grpcpp/ext/proto_server_reflection_plugin.h> >>> #include <grpcpp/server.h> >>> #include <grpcpp/server_builder.h> >>> #include <grpcpp/server_context.h> >>> #include "file_streaming.grpc.pb.h" >>> >>> using grpc::CallbackServerContext; >>> using grpc::Server; >>> using grpc::ServerUnaryReactor; >>> using grpc::ServerBuilder; >>> using grpc::ServerContext; >>> using grpc::ServerReader; >>> using grpc::ServerReaderWriter; >>> using grpc::ServerWriter; >>> using grpc::Status; >>> >>> class FileStreamingImpl final : public >>> Illuscio::FileStreaming::CallbackService { >>> public: >>> grpc::ServerWriteReactor<Illuscio::FileChunk>* >>> GetFile(grpc::CallbackServerContext* context, const Illuscio::FileRequest* >>> request) override { >>> class FileSplitter : public >>> grpc::ServerWriteReactor<Illuscio::FileChunk> { >>> private: >>> const int chunk_size = 1024; >>> char buffer[1024]; >>> std::ifstream file; >>> // >>> public: >>> FileSplitter(const std::string& file_path) : file(file_path, >>> std::ios::binary) { >>> try { >>> if (!file) { >>> Finish(grpc::Status(grpc::StatusCode::INTERNAL, "Error >>> opening file")); >>> return; >>> } >>> >>> NextWrite(); >>> } catch (const std::exception& ex) { >>> Finish(grpc::Status(grpc::StatusCode::INTERNAL, ex.what())); >>> } >>> } >>> // >>> void OnDone() override { delete this; } >>> void OnWriteDone(bool /*ok*/) override { >>> std::cout << "Another write..." << std::endl; >>> NextWrite(); >>> } >>> >>> private: >>> void NextWrite() { >>> file.read(buffer, chunk_size); >>> const int read_size = static_cast<int>(file.gcount()); >>> >>> if (read_size > 0) { >>> Illuscio::FileChunk chunk; >>> chunk.set_chunk_data(buffer, read_size); >>> std::cout << "Buffer contents: "; >>> // >>> for (int i = 0; i < read_size; ++i) { >>> std::cout << static_cast<int>(buffer[i]) << " "; >>> } >>> std::cout << std::endl; >>> std::cout << "Buffer size: " << sizeof(buffer) << " >>> bytes" << std::endl; >>> std::cout << "Read size: " << read_size << " bytes" << >>> std::endl; >>> // >>> StartWrite(&chunk); >>> } else { >>> // All data has been sent, finish the stream >>> Finish(grpc::Status::OK); >>> } >>> } >>> }; >>> >>> return new FileSplitter(request->file_path()); >>> } >>> >>> ....... >>> >>> I have tried with several buffer sizes from 1024 bytes to 1MB, but still >>> getting the "Bus error" message. >>> >>> Any thoughts? >>> >>> >>> Thanks in advance. >>> >>> -- >>> You received this message because you are subscribed to the Google >>> Groups "grpc.io" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to grpc-io+u...@googlegroups.com. >>> To view this discussion on the web visit >>> https://groups.google.com/d/msgid/grpc-io/5edbaedb-681d-44a3-ba1f-8e8d0c896b69n%40googlegroups.com >>> >>> <https://groups.google.com/d/msgid/grpc-io/5edbaedb-681d-44a3-ba1f-8e8d0c896b69n%40googlegroups.com?utm_medium=email&utm_source=footer> >>> . >>> >> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/380f385f-e798-49b5-98d8-c08998b24a8an%40googlegroups.com.