Hey,
for a project I start understanding how gRPC works. For this I implemented the 
following setup:

A C++ server using the sync API offers two services RegisterCommand(streaming) 
and NewCommandMsg(blocking). This is the .proto definition:

service Command {
    rpc RegisterCloud (CommandRequest) returns (stream CommandMessage) {}
    rpc NewCommandMsg (CommandMessage) returns (google.protobuf.Empty) {}
}
what do I try to achieve?

Multiple clients shall call RegisterCommand and the server shall block inside 
the procedure until a call to NewCommandMsg happened (I guarantee that only one 
single call happens at a time). If NewCommandMsg is called, the argument 
CommandMessage shall be transported to every thread of RegisterCommand (I 
understood every call is handled in a thread), the thread shall be unblocked 
and the CommandMessage shall be written to the stream. After that, the threads 
of RegisterCommand shall be blocked again and wait for the next call to 
NewCommandMsg. Later, the NewCommandMsg will be replaced by a single non-grpc 
thread.

What did I already do

I read a lot about (shared) futures, promise, mutex and conditional variables 
in C++ and implemented the following code.

class CommandServiceImpl final : public Command::Service {
    //To my understanding these are common for all threads
    std::promise<CommandMessage> newCommandPromise;
    std::shared_future<CommandMessage> newCommandFuture = 
this->newCommandPromise.get_future();

    //To my understanding this is executed in an own thread
    Status RegisterCommand(ServerContext* context, const CommandRequest* 
request, ServerWriter<CommandMessage>* writer) override {
        //Each thread gets its own copy of the shared future
        std::shared_future<CommandMessage> future = this->newCommandFuture;
        while(!context->IsCancelled()){
            future.wait();
            (void)future.get();
            std::cout << "distributing command" << std::endl;
            //actual writing would happen here
        }

        return Status::CANCELLED;
    }

    //To my understanding this is executed in an own thread
    Status NewCommandMsg(ServerContext* context, const CommandMessage* request, 
google::protobuf::Empty* response) override {
        std::promise<CommandMessage> promise = move(this->newCommandPromise);

        std::cout << "new command received" << std::endl;
        promise.set_value(*request);

        //Provide new promise, for next call
        //In my evaluation phase, I guarantee, that only one client at a time 
will call NewCommandMsg
        std::promise<CommandMessage> cleanPromise;
        this->newCommandPromise = move(cleanPromise);

        return Status::OK;
    }
};
What happens with that code

After one or multiple concurrent calls to RegisterCommand, the server blocks 
and after a call to NewCommandMessage, the future.wait() unblocks, which is 
expected. After that, of course future.wait() is always non-blocking, so that 
the threads run in an infinite loop. But it may only run exactly once and then 
wait for new data to be available.

It seems that it is not possible to "reuse" an existing future. Any ideas on 
how to achieve my goal?

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/78dbca9e-9ece-4632-a18a-6a2bd12f93d6%40googlegroups.com.

Reply via email to