I think the use case you are describing is better implemented using a condition variable. All threads wait on the condition variable and then the newcommandmsg will signal all. Then the threads execute and can wait again on same condition. this is how consumer/producer pattern works, which is what you are implementing essentially.
On Wed, May 13, 2020 at 4:36 PM <[email protected]> wrote: > Hey, > for a project I start understanding how gRPC works. For this I implemented > the following setup: > > A C++ server using the sync API offers two services > RegisterCommand(streaming) and NewCommandMsg(blocking). This is the .proto > definition: > > service Command { > rpc RegisterCloud (CommandRequest) returns (stream CommandMessage) {} > rpc NewCommandMsg (CommandMessage) returns (google.protobuf.Empty) {} > } > what do I try to achieve? > > Multiple clients shall call RegisterCommand and the server shall block > inside the procedure until a call to NewCommandMsg happened (I guarantee > that only one single call happens at a time). If NewCommandMsg is called, > the argument CommandMessage shall be transported to every thread of > RegisterCommand (I understood every call is handled in a thread), the > thread shall be unblocked and the CommandMessage shall be written to the > stream. After that, the threads of RegisterCommand shall be blocked again > and wait for the next call to NewCommandMsg. Later, the NewCommandMsg will > be replaced by a single non-grpc thread. > > What did I already do > > I read a lot about (shared) futures, promise, mutex and conditional > variables in C++ and implemented the following code. > > class CommandServiceImpl final : public Command::Service { > //To my understanding these are common for all threads > std::promise<CommandMessage> newCommandPromise; > std::shared_future<CommandMessage> newCommandFuture = > this->newCommandPromise.get_future(); > > //To my understanding this is executed in an own thread > Status RegisterCommand(ServerContext* context, const CommandRequest* > request, ServerWriter<CommandMessage>* writer) override { > //Each thread gets its own copy of the shared future > std::shared_future<CommandMessage> future = this->newCommandFuture; > while(!context->IsCancelled()){ > future.wait(); > (void)future.get(); > std::cout << "distributing command" << std::endl; > //actual writing would happen here > } > > return Status::CANCELLED; > } > > //To my understanding this is executed in an own thread > Status NewCommandMsg(ServerContext* context, const CommandMessage* > request, google::protobuf::Empty* response) override { > std::promise<CommandMessage> promise = > move(this->newCommandPromise); > > std::cout << "new command received" << std::endl; > promise.set_value(*request); > > //Provide new promise, for next call > //In my evaluation phase, I guarantee, that only one client at a > time will call NewCommandMsg > std::promise<CommandMessage> cleanPromise; > this->newCommandPromise = move(cleanPromise); > > return Status::OK; > } > }; > What happens with that code > > After one or multiple concurrent calls to RegisterCommand, the server > blocks and after a call to NewCommandMessage, the future.wait() unblocks, > which is expected. After that, of course future.wait() is always > non-blocking, so that the threads run in an infinite loop. But it may only > run exactly once and then wait for new data to be available. > > It seems that it is not possible to "reuse" an existing future. Any ideas > on how to achieve my goal? > > -- > You received this message because you are subscribed to the Google Groups " > grpc.io" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To view this discussion on the web visit > https://groups.google.com/d/msgid/grpc-io/78dbca9e-9ece-4632-a18a-6a2bd12f93d6%40googlegroups.com > . > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAA-WHumDH3n2TdQVu%2BkT6UT16qKSPkpoYh_UwEfsv24ijN9uTA%40mail.gmail.com.
