Hey,
for a project I start understanding how gRPC works. For this I implemented the
following setup:
A C++ server using the sync API offers two services RegisterCommand(streaming)
and NewCommandMsg(blocking). This is the .proto definition:
service Command {
rpc RegisterCloud (CommandRequest) returns (stream CommandMessage) {}
rpc NewCommandMsg (CommandMessage) returns (google.protobuf.Empty) {}
}
what do I try to achieve?
Multiple clients shall call RegisterCommand and the server shall block inside
the procedure until a call to NewCommandMsg happened (I guarantee that only one
single call happens at a time). If NewCommandMsg is called, the argument
CommandMessage shall be transported to every thread of RegisterCommand (I
understood every call is handled in a thread), the thread shall be unblocked
and the CommandMessage shall be written to the stream. After that, the threads
of RegisterCommand shall be blocked again and wait for the next call to
NewCommandMsg. Later, the NewCommandMsg will be replaced by a single non-grpc
thread.
What did I already do
I read a lot about (shared) futures, promise, mutex and conditional variables
in C++ and implemented the following code.
class CommandServiceImpl final : public Command::Service {
//To my understanding these are common for all threads
std::promise<CommandMessage> newCommandPromise;
std::shared_future<CommandMessage> newCommandFuture =
this->newCommandPromise.get_future();
//To my understanding this is executed in an own thread
Status RegisterCommand(ServerContext* context, const CommandRequest*
request, ServerWriter<CommandMessage>* writer) override {
//Each thread gets its own copy of the shared future
std::shared_future<CommandMessage> future = this->newCommandFuture;
while(!context->IsCancelled()){
future.wait();
(void)future.get();
std::cout << "distributing command" << std::endl;
//actual writing would happen here
}
return Status::CANCELLED;
}
//To my understanding this is executed in an own thread
Status NewCommandMsg(ServerContext* context, const CommandMessage* request,
google::protobuf::Empty* response) override {
std::promise<CommandMessage> promise = move(this->newCommandPromise);
std::cout << "new command received" << std::endl;
promise.set_value(*request);
//Provide new promise, for next call
//In my evaluation phase, I guarantee, that only one client at a time
will call NewCommandMsg
std::promise<CommandMessage> cleanPromise;
this->newCommandPromise = move(cleanPromise);
return Status::OK;
}
};
What happens with that code
After one or multiple concurrent calls to RegisterCommand, the server blocks
and after a call to NewCommandMessage, the future.wait() unblocks, which is
expected. After that, of course future.wait() is always non-blocking, so that
the threads run in an infinite loop. But it may only run exactly once and then
wait for new data to be available.
It seems that it is not possible to "reuse" an existing future. Any ideas on
how to achieve my goal?
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/78dbca9e-9ece-4632-a18a-6a2bd12f93d6%40googlegroups.com.