Thanks for example, but I have one doubt in Client code: call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_, (void *)call);
What if we receive response in bool HandleResponse(bool responseStatus) before call->response_reader is assigned. Isn't it potential racing condition? We could have context switch after RPC is fully initialized, but before assignment happens. On Tuesday, December 13, 2022 at 10:34:11 a.m. UTC-5 Tom Mclean wrote: > > Amazing..... > > Thank you so much. > On Thursday, April 27, 2017 at 7:10:56 AM UTC+1 kuldeepm...@gmail.com > wrote: > >> While implementing asynchronous streaming grpc, there are no straight >> forward examples that can be used, it was hard time for me to implement it. >> Now that I have implemented the hello world version of async stream >> version, I thought I will share. >> >> In this example, client will request for stream of replies by initiating >> asynchronous RPC and the server will respond with 5 replies, after server >> is done with 5 replies it will close the RPC by calling finish and then >> client will close the RPC. >> >> Hope it will be helpful. >> >> Thanks >> Kuldeep >> >> *Protocol Buffer:* >> // The greeting service definition. >> service Greeter { >> // Sends a greeting >> rpc SayHello (HelloRequest) returns (stream HelloReply) {} >> } >> >> // The request message containing the user's name. >> message HelloRequest { >> string name = 1; >> } >> >> // The response message containing the greetings >> message HelloReply { >> string message = 1; >> } >> >> Server Code: >> class ServerImpl final { >> public: >> ~ServerImpl() { >> server_->Shutdown(); >> // Always shutdown the completion queue after the server. >> cq_->Shutdown(); >> } >> >> // There is no shutdown handling in this code. >> void Run() { >> std::string server_address("0.0.0.0:50051"); >> >> ServerBuilder builder; >> // Listen on the given address without any authentication >> mechanism. >> builder.AddListeningPort(server_address, >> grpc::InsecureServerCredentials()); >> // Register "service_" as the instance through which we'll >> communicate with >> // clients. In this case it corresponds to an *asynchronous* >> service. >> builder.RegisterService(&service_); >> // Get hold of the completion queue used for the asynchronous >> communication >> // with the gRPC runtime. >> cq_ = builder.AddCompletionQueue(); >> // Finally assemble the server. >> server_ = builder.BuildAndStart(); >> std::cout << "Server listening on " << server_address << >> std::endl; >> >> // Proceed to the server's main loop. >> HandleRpcs(); >> } >> >> private: >> // Class encompasing the state and logic needed to serve a request. >> class CallData { >> public: >> // Take in the "service" instance (in this case representing an >> asynchronous >> // server) and the completion queue "cq" used for asynchronous >> communication >> // with the gRPC runtime. >> CallData(Greeter::AsyncService* service, ServerCompletionQueue* >> cq) >> : service_(service), cq_(cq), repliesSent_(0), >> responder_(&ctx_), status_(CREATE) { >> // Invoke the serving logic right away. >> Proceed(); >> } >> >> void Proceed() { >> if (status_ == CREATE) { >> // Make this instance progress to the PROCESS state. >> status_ = PROCESS; >> std::cout << "Creating Call data for new client >> connections: " << this << std::endl; >> // As part of the initial CREATE state, we *request* that >> the system >> // start processing SayHello requests. In this request, >> "this" acts are >> // the tag uniquely identifying the request (so that >> different CallData >> // instances can serve different requests concurrently), >> in this case >> // the memory address of this CallData instance. >> service_->RequestSayHello(&ctx_, &request_, &responder_, >> cq_, cq_, >> (void*) this); >> } else if (status_ == PROCESS) { >> // Spawn a new CallData instance to serve new clients >> while we process >> // the one for this CallData. The instance will >> deallocate itself as >> // part of its FINISH state. >> new CallData(service_, cq_); >> >> // The actual processing. >> std::string prefix("Hello "); >> reply_.set_message(prefix + request_.name() + >> std::to_string(repliesSent_ + 1)); >> std::cout << "Sending reponse: " << this << " : " << >> reply_.message() << std::endl; >> responder_.Write(reply_, this); >> status_ = PROCESSING; >> repliesSent_++; >> >> } else if (status_ == PROCESSING) { >> if (repliesSent_ == MAX_REPLIES) { >> // And we are done! Let the gRPC runtime know we've >> finished, using the >> // memory address of this instance as the uniquely >> identifying tag for >> // the event. >> status_ = FINISH; >> responder_.Finish(Status::OK, this); >> } else { >> // The actual processing. >> std::string prefix("Hello "); >> reply_.set_message(prefix + request_.name() + >> std::to_string(repliesSent_ + 1)); >> std::cout << "Sending reponse: " << this << " : " << >> reply_.message() << std::endl; >> responder_.Write(reply_, this); >> status_ = PROCESSING; >> repliesSent_++; >> } >> } else { >> GPR_ASSERT(status_ == FINISH); >> std::cout << "Completed RPC for: " << this << std::endl; >> // Once in the FINISH state, deallocate ourselves >> (CallData). >> delete this; >> } >> } >> >> private: >> // The means of communication with the gRPC runtime for an >> asynchronous >> // server. >> Greeter::AsyncService* service_; >> // The producer-consumer queue where for asynchronous server >> notifications. >> ServerCompletionQueue* cq_; >> // Context for the rpc, allowing to tweak aspects of it such as >> the use >> // of compression, authentication, as well as to send metadata >> back to the >> // client. >> ServerContext ctx_; >> >> // What we get from the client. >> HelloRequest request_; >> // What we send back to the client. >> HelloReply reply_; >> >> uint32_t repliesSent_; >> const uint32_t MAX_REPLIES = 5; >> >> // The means to get back to the client. >> ServerAsyncWriter<HelloReply> responder_; >> >> // Let's implement a tiny state machine with the following states. >> enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH }; >> CallStatus status_; // The current serving state. >> }; >> >> // This can be run in multiple threads if needed. >> void HandleRpcs() { >> // Spawn a new CallData instance to serve new clients. >> new CallData(&service_, cq_.get()); >> void* tag; // uniquely identifies a request. >> bool ok; >> while (true) { >> // Block waiting to read the next event from the completion >> queue. The >> // event is uniquely identified by its tag, which in this >> case is the >> // memory address of a CallData instance. >> // The return value of Next should always be checked. This >> return value >> // tells us whether there is any kind of event or cq_ is >> shutting down. >> GPR_ASSERT(cq_->Next(&tag, &ok)); >> GPR_ASSERT(ok); >> static_cast<CallData*>(tag)->Proceed(); >> } >> } >> >> std::unique_ptr<ServerCompletionQueue> cq_; >> Greeter::AsyncService service_; >> std::unique_ptr<Server> server_; >> }; >> >> int main(int argc, char** argv) { >> ServerImpl server; >> server.Run(); >> >> return 0; >> } >> >> >> Client Code: >> class GreeterClient { >> public: >> explicit GreeterClient(std::shared_ptr<Channel> channel) >> : stub_(Greeter::NewStub(channel)) {} >> >> // Assembles the client's payload and sends it to the server. >> void SayHello(const std::string& user) { >> HelloRequest request; >> // Data we are sending to the server. >> request.set_name(user); >> >> // Call object to store rpc data >> AsyncClientCall* call = new AsyncClientCall; >> >> // stub_->AsyncSayHello() performs the RPC call, returning an >> instance to >> // store in "call". Because we are using the asynchronous API, we >> need to >> // hold on to the "call" instance in order to get updates on the >> ongoing RPC. >> call->response_reader = stub_->AsyncSayHello(&call->context, >> request, &cq_, (void *)call); >> } >> >> // Loop while listening for completed responses. >> // Prints out the response from the server. >> void AsyncCompleteRpc() { >> void* got_tag; >> bool ok = false; >> >> // Block until the next result is available in the completion >> queue "cq". >> while (cq_.Next(&got_tag, &ok)) { >> // The tag in this example is the memory location of the call >> object >> ResponseHandler* responseHandler = >> static_cast<ResponseHandler*>(got_tag); >> std::cout << "Tag received: " << responseHandler << std::endl; >> >> // Verify that the request was completed successfully. Note >> that "ok" >> // corresponds solely to the request for updates introduced >> by Finish(). >> std::cout << "Next returned: " << ok << std::endl; >> responseHandler->HandleResponse(ok); >> } >> } >> >> private: >> >> class ResponseHandler { >> public: >> virtual bool HandleResponse(bool eventStatus) = 0; >> }; >> >> // struct for keeping state and data information >> class AsyncClientCall: public ResponseHandler { >> enum CallStatus {CREATE, PROCESS, PROCESSED, FINISH}; >> CallStatus callStatus_; >> public: >> >> AsyncClientCall(): callStatus_(CREATE) {} >> >> // Container for the data we expect from the server. >> HelloReply reply; >> // Context for the client. It could be used to convey extra >> information to >> // the server and/or tweak certain RPC behaviors. >> ClientContext context; >> >> // Storage for the status of the RPC upon completion. >> Status status; >> >> //std::unique_ptr<ClientAsyncResponseReader<HelloReply>> >> response_reader; >> std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> >> response_reader; >> >> bool HandleResponse(bool responseStatus) override { >> switch (callStatus_) { >> case CREATE: >> if (responseStatus) { >> response_reader->Read(&reply, (void*)this); >> callStatus_ = PROCESS; >> } else { >> response_reader->Finish(&status, (void*)this); >> callStatus_ = FINISH; >> } >> break; >> case PROCESS: >> if (responseStatus) { >> std::cout << "Greeter received: " << this << " : " << >> reply.message() << std::endl; >> response_reader->Read(&reply, (void*)this); >> } else { >> response_reader->Finish(&status, (void*)this); >> callStatus_ = FINISH; >> } >> break; >> case FINISH: >> if (status.ok()) { >> >> std::cout << "Server Response Completed: " << this << >> " CallData: " << this << std::endl; >> } >> else { >> std::cout << "RPC failed" << std::endl; >> } >> delete this; >> } >> } >> }; >> >> // Out of the passed in Channel comes the stub, stored here, our view >> of the >> // server's exposed services. >> std::unique_ptr<Greeter::Stub> stub_; >> >> // The producer-consumer queue we use to communicate asynchronously >> with the >> // gRPC runtime. >> CompletionQueue cq_; >> }; >> >> int main(int argc, char** argv) { >> // Instantiate the client. It requires a channel, out of which the >> actual RPCs >> // are created. This channel models a connection to an endpoint (in >> this case, >> // localhost at port 50051). We indicate that the channel isn't >> authenticated >> // (use of InsecureChannelCredentials()). >> GreeterClient greeter(grpc::CreateChannel( >> "localhost:50051", >> grpc::InsecureChannelCredentials())); >> >> // Spawn reader thread that loops indefinitely >> std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, >> &greeter); >> >> std::string user("world"); >> greeter.SayHello(user); // The actual RPC call! >> >> std::cout << "Press control-c to quit" << std::endl << std::endl; >> thread_.join(); //blocks forever >> >> return 0; >> } >> >> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/9b9275b2-f612-46ca-a8c0-e9b7b55ebb98n%40googlegroups.com.