Hi community.

We are using gRPC C++ Async API approach on the server and Swift on the 
client.
Our goal is to read and stream a file in chunks to the Swift clients.

This is our server code (code is really simple):
-------------------------------------------------------------------------

#include "Streamer_grpc.h"

namespace Illuscio {

Streamer_GRPC::~Streamer_GRPC() {
    _server->Shutdown();
    _queue->Shutdown();
};

void Streamer_GRPC::Run( uint16_t port ) {
    std::string server_address = "0.0.0.0:" + std::to_string( port );

    ServerBuilder builder;

    builder.AddListeningPort( server_address, 
grpc::InsecureServerCredentials() );
    // Register "service_" as the instance through which we'll communicate 
with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService( &_service );
    // Get hold of the completion queue used for the asynchronous 
communication
    // with the gRPC runtime.
    _queue = builder.AddCompletionQueue();
    // Finally assemble the server.
    _server = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRPCs();
}


Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* 
service, ServerCompletionQueue* queue )
    : CallData { service, queue }, _responder { &_context } {
    _tag.id = MessageID::TMP_FILE;
    _tag.data = this;
    Proceed();
}

void Streamer_GRPC::TMPFileData::Proceed() {
    switch ( _status ) {
    case CallStatus::CREATE: {
        _status = CallStatus::OPEN_FILE;
        _service->RequestStreamFile( &_context, &_clientFileReq, 
&_responder, _cq, _cq, (void*) this );
        break;
    }

    case CallStatus::OPEN_FILE: {
        myfile.open(_clientFileReq.file_path(), std::ios::binary);
            
        if (myfile.is_open()) {
            _status = CallStatus::PROCESS;
            std::cout << "File was successfuly open..." << std::endl;
            static_cast<CallData*>(this)->Proceed();
        } else {
            // Handles file opening failure
            std::cout << "File open operation failed..." << std::endl;
            _responder.Finish(Status::CANCELLED, nullptr);
            return;
        }
    
        break;
    }

    case CallStatus::PROCESS: {
        new TMPFileData { _service, _cq };
    
        if (!myfile.eof()) {
            const int read_size = static_cast<int>(myfile.gcount());
            _fileChunk.set_chunk_data(buffer, read_size);
            _responder .Write(_fileChunk, this);
        } else {
            std::cout << "EOF reached..." << std::endl;
            _status = CallStatus::FINISH;
            _responder.Finish(Status::OK, this);
        }

        break;
    }  
    
    default: {
        delete this;
    }
    }
}

void Streamer_GRPC::HandleRPCs() {
    new TMPFileData { &_service, _queue.get() };
    void* tag;
    bool ok;

    while ( _queue->Next(&tag, &ok) ) {
       // GPR_ASSERT(_queue->Next(&tag, &ok));
       // GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}
};

This is our proto file:
--------------------------------
syntax = "proto3";
package Illuscio;

service PROTO_Streamer {
    rpc StreamFile(TMP_FileRequest) returns (stream TMP_FileChunk);
}

message TMP_FileRequest {
    string file_path = 1;
}

message TMP_FileChunk {
    bytes chunk_data = 1;
}

And lastly, this is our client code snippet:
-----------------------------------------------------------------

Below method calls the actual gRPC client method (`StreamFile` rpc). It 
actually just call next and wait for the server to write it and then the 
data is appended until there's no more data to append.

func getContentData(for sourceURL: URL) async throws -> Data {
        do {
            try connection.connect()
        } catch { throw MCError.serverConnectionError }

        // Make the gRPC call to fetch the file's data asynchronously
        
        Task {
            var receivedData = Data()
            var fileChunkIterator = (connection.client?.getAsset(with: 
sourceURL) as! 
GRPCAsyncResponseStream<Illuscio_TMP_FileChunk>).makeAsyncIterator()
            do {
                var fileChunk = try await fileChunkIterator.next()
                // Concurrently fetch and append chunks to receivedData
                while fileChunk != nil {
                    receivedData.append(fileChunk!.chunkData)
                    fileChunk = try await fileChunkIterator.next()
                }
                
                try? connection.disconnect()
            } catch (let error) {
                throw MCError.fileWriteToLocalFailed
            }
        }
            // Once the call completes, disconnect from the server
            return Data()
    }

Questions:
=========
1.- Unless we are missing something, we do not see the same performance as 
with the gRPC callbacks implementation. We know next is marked as await, so 
our understanding is it won't be asking for the file's next chunk until it 
is written on the `callbackqueue` server side, then we will be unblocked on 
client side and we can ask for the next chunk(iterator next method) and so 
on (since there's only 1 write task permitted at a time).....
2.- With callbacks, we were able to have a loop on the server iterating 
over a file and sending one chunk after the other until we reach the file's 
end. Because of this approach we were able to receive the chunks really 
really fast on the client side (which is not happening with the gRPC async 
approach).
3.- And we are obligated to use the Async approach since we will be serving 
several clients at the same time.
4.- Are we missing something?

Thanks in advance.

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/661dd92a-657a-4128-8b47-1f0ca1e7cad6n%40googlegroups.com.

Reply via email to