Thanks for example, but I have one doubt in Client code:

call->response_reader = stub_->AsyncSayHello(&call->context, request, &cq_, 
(void *)call);

What if we receive response in  bool HandleResponse(bool responseStatus) 
before  call->response_reader is assigned. Isn't it potential racing 
condition? We could have context switch after RPC is fully initialized, but 
before assignment happens.

On Tuesday, December 13, 2022 at 10:34:11 a.m. UTC-5 Tom Mclean wrote:

>
> Amazing.....
>
> Thank you so much.
> On Thursday, April 27, 2017 at 7:10:56 AM UTC+1 kuldeepm...@gmail.com 
> wrote:
>
>> While implementing asynchronous streaming grpc, there are no straight 
>> forward examples that can be used, it was hard time for me to implement it. 
>> Now that I have implemented the hello world version of async stream 
>> version, I thought I will share.
>>
>> In this example, client will request for stream of replies by initiating 
>> asynchronous RPC and the server will respond with 5 replies, after server 
>> is done with 5 replies it will close the RPC by calling finish and then 
>> client will close the RPC.
>>
>> Hope it will be helpful.
>>
>> Thanks
>> Kuldeep
>>
>> *Protocol Buffer:*
>> // The greeting service definition.
>> service Greeter {
>>   // Sends a greeting
>>   rpc SayHello (HelloRequest) returns (stream HelloReply) {}
>> }
>>
>> // The request message containing the user's name.
>> message HelloRequest {
>>   string name = 1;
>> }
>>
>> // The response message containing the greetings
>> message HelloReply {
>>   string message = 1;
>> }
>>
>> Server Code:
>> class ServerImpl final {
>> public:
>>     ~ServerImpl() {
>>         server_->Shutdown();
>>         // Always shutdown the completion queue after the server.
>>         cq_->Shutdown();
>>     }
>>
>>     // There is no shutdown handling in this code.
>>     void Run() {
>>         std::string server_address("0.0.0.0:50051");
>>
>>         ServerBuilder builder;
>>         // Listen on the given address without any authentication 
>> mechanism.
>>         builder.AddListeningPort(server_address, 
>> grpc::InsecureServerCredentials());
>>         // Register "service_" as the instance through which we'll 
>> communicate with
>>         // clients. In this case it corresponds to an *asynchronous* 
>> service.
>>         builder.RegisterService(&service_);
>>         // Get hold of the completion queue used for the asynchronous 
>> communication
>>         // with the gRPC runtime.
>>         cq_ = builder.AddCompletionQueue();
>>         // Finally assemble the server.
>>         server_ = builder.BuildAndStart();
>>         std::cout << "Server listening on " << server_address << 
>> std::endl;
>>
>>         // Proceed to the server's main loop.
>>         HandleRpcs();
>>     }
>>
>> private:
>>     // Class encompasing the state and logic needed to serve a request.
>>     class CallData {
>>     public:
>>         // Take in the "service" instance (in this case representing an 
>> asynchronous
>>         // server) and the completion queue "cq" used for asynchronous 
>> communication
>>         // with the gRPC runtime.
>>         CallData(Greeter::AsyncService* service, ServerCompletionQueue* 
>> cq)
>>             : service_(service), cq_(cq), repliesSent_(0), 
>> responder_(&ctx_), status_(CREATE) {
>>             // Invoke the serving logic right away.
>>             Proceed();
>>         }
>>
>>         void Proceed() {
>>             if (status_ == CREATE) {
>>                 // Make this instance progress to the PROCESS state.
>>                 status_ = PROCESS;
>>                 std::cout << "Creating Call data for new client 
>> connections: " << this << std::endl;
>>                 // As part of the initial CREATE state, we *request* that 
>> the system
>>                 // start processing SayHello requests. In this request, 
>> "this" acts are
>>                 // the tag uniquely identifying the request (so that 
>> different CallData
>>                 // instances can serve different requests concurrently), 
>> in this case
>>                 // the memory address of this CallData instance.
>>                 service_->RequestSayHello(&ctx_, &request_, &responder_, 
>> cq_, cq_,
>>                                           (void*) this);
>>             } else if (status_ == PROCESS) {
>>                 // Spawn a new CallData instance to serve new clients 
>> while we process
>>                 // the one for this CallData. The instance will 
>> deallocate itself as
>>                 // part of its FINISH state.
>>                 new CallData(service_, cq_);
>>
>>                 // The actual processing.
>>                 std::string prefix("Hello ");
>>                 reply_.set_message(prefix + request_.name() +
>>                     std::to_string(repliesSent_ + 1));
>>                 std::cout << "Sending reponse: " << this << " : " << 
>> reply_.message() << std::endl;
>>                 responder_.Write(reply_, this);
>>                 status_ = PROCESSING;
>>                 repliesSent_++;
>>
>>             } else if (status_ == PROCESSING) {
>>                 if (repliesSent_ == MAX_REPLIES) {
>>                     // And we are done! Let the gRPC runtime know we've 
>> finished, using the
>>                     // memory address of this instance as the uniquely 
>> identifying tag for
>>                     // the event.
>>                     status_ = FINISH;
>>                     responder_.Finish(Status::OK, this);
>>                 } else {
>>                     // The actual processing.
>>                     std::string prefix("Hello ");
>>                     reply_.set_message(prefix + request_.name() + 
>> std::to_string(repliesSent_ + 1));
>>                     std::cout << "Sending reponse: " << this << " : " << 
>> reply_.message() << std::endl;
>>                     responder_.Write(reply_, this);
>>                     status_ = PROCESSING;
>>                     repliesSent_++;
>>                 }
>>             } else {
>>                 GPR_ASSERT(status_ == FINISH);
>>                 std::cout << "Completed RPC for: " << this << std::endl;
>>                 // Once in the FINISH state, deallocate ourselves 
>> (CallData).
>>                 delete this;
>>             }
>>         }
>>
>>     private:
>>         // The means of communication with the gRPC runtime for an 
>> asynchronous
>>         // server.
>>         Greeter::AsyncService* service_;
>>         // The producer-consumer queue where for asynchronous server 
>> notifications.
>>         ServerCompletionQueue* cq_;
>>         // Context for the rpc, allowing to tweak aspects of it such as 
>> the use
>>         // of compression, authentication, as well as to send metadata 
>> back to the
>>         // client.
>>         ServerContext ctx_;
>>
>>         // What we get from the client.
>>         HelloRequest request_;
>>         // What we send back to the client.
>>         HelloReply reply_;
>>
>>         uint32_t repliesSent_;
>>         const uint32_t MAX_REPLIES = 5;
>>
>>         // The means to get back to the client.
>>         ServerAsyncWriter<HelloReply> responder_;
>>
>>         // Let's implement a tiny state machine with the following states.
>>         enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
>>         CallStatus status_;  // The current serving state.
>>     };
>>
>>     // This can be run in multiple threads if needed.
>>     void HandleRpcs() {
>>         // Spawn a new CallData instance to serve new clients.
>>         new CallData(&service_, cq_.get());
>>         void* tag;  // uniquely identifies a request.
>>         bool ok;
>>         while (true) {
>>             // Block waiting to read the next event from the completion 
>> queue. The
>>             // event is uniquely identified by its tag, which in this 
>> case is the
>>             // memory address of a CallData instance.
>>             // The return value of Next should always be checked. This 
>> return value
>>             // tells us whether there is any kind of event or cq_ is 
>> shutting down.
>>             GPR_ASSERT(cq_->Next(&tag, &ok));
>>             GPR_ASSERT(ok);
>>             static_cast<CallData*>(tag)->Proceed();
>>         }
>>     }
>>
>>     std::unique_ptr<ServerCompletionQueue> cq_;
>>     Greeter::AsyncService service_;
>>     std::unique_ptr<Server> server_;
>> };
>>
>> int main(int argc, char** argv) {
>>     ServerImpl server;
>>     server.Run();
>>
>>     return 0;
>> }
>>
>>
>> Client Code:
>> class GreeterClient {
>> public:
>>     explicit GreeterClient(std::shared_ptr<Channel> channel)
>>         : stub_(Greeter::NewStub(channel)) {}
>>
>>     // Assembles the client's payload and sends it to the server.
>>     void SayHello(const std::string& user) {
>>         HelloRequest request;
>>         // Data we are sending to the server.
>>         request.set_name(user);
>>
>>         // Call object to store rpc data
>>         AsyncClientCall* call = new AsyncClientCall;
>>
>>         // stub_->AsyncSayHello() performs the RPC call, returning an 
>> instance to
>>         // store in "call". Because we are using the asynchronous API, we 
>> need to
>>         // hold on to the "call" instance in order to get updates on the 
>> ongoing RPC.
>>         call->response_reader = stub_->AsyncSayHello(&call->context, 
>> request, &cq_, (void *)call);
>>     }
>>
>>     // Loop while listening for completed responses.
>>     // Prints out the response from the server.
>>     void AsyncCompleteRpc() {
>>         void* got_tag;
>>         bool ok = false;
>>
>>         // Block until the next result is available in the completion 
>> queue "cq".
>>         while (cq_.Next(&got_tag, &ok)) {
>>             // The tag in this example is the memory location of the call 
>> object
>>             ResponseHandler* responseHandler = 
>> static_cast<ResponseHandler*>(got_tag);
>>             std::cout << "Tag received: " << responseHandler << std::endl;
>>
>>             // Verify that the request was completed successfully. Note 
>> that "ok"
>>             // corresponds solely to the request for updates introduced 
>> by Finish().
>>             std::cout << "Next returned: " << ok << std::endl;
>>             responseHandler->HandleResponse(ok);
>>         }
>>     }
>>
>> private:
>>
>>     class ResponseHandler {
>>     public:
>>         virtual bool HandleResponse(bool eventStatus) = 0;
>>     };
>>
>>     // struct for keeping state and data information
>>     class AsyncClientCall: public ResponseHandler {
>>         enum CallStatus {CREATE, PROCESS, PROCESSED, FINISH};
>>         CallStatus callStatus_;
>>     public:
>>
>>         AsyncClientCall(): callStatus_(CREATE) {}
>>
>>         // Container for the data we expect from the server.
>>         HelloReply reply;
>>         // Context for the client. It could be used to convey extra 
>> information to
>>         // the server and/or tweak certain RPC behaviors.
>>         ClientContext context;
>>
>>         // Storage for the status of the RPC upon completion.
>>         Status status;
>>
>>         //std::unique_ptr<ClientAsyncResponseReader<HelloReply>> 
>> response_reader;
>>         std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> 
>> response_reader;
>>
>>         bool HandleResponse(bool responseStatus) override {
>>             switch (callStatus_) {
>>             case CREATE:
>>                 if (responseStatus) {
>>                     response_reader->Read(&reply, (void*)this);
>>                     callStatus_ = PROCESS;
>>                 } else {
>>                     response_reader->Finish(&status, (void*)this);
>>                     callStatus_ = FINISH;
>>                 }
>>                 break;
>>             case PROCESS:
>>                 if (responseStatus) {
>>                     std::cout << "Greeter received: " << this << " : " << 
>> reply.message() << std::endl;
>>                     response_reader->Read(&reply, (void*)this);
>>                 } else {
>>                     response_reader->Finish(&status, (void*)this);
>>                     callStatus_ = FINISH;
>>                 }
>>                 break;
>>             case FINISH:
>>                 if (status.ok()) {
>>
>>                     std::cout << "Server Response Completed: " << this << 
>> " CallData: " << this << std::endl;
>>                 }
>>                 else {
>>                     std::cout << "RPC failed" << std::endl;
>>                 }
>>                 delete this;
>>             }
>>         }
>>     };
>>
>>     // Out of the passed in Channel comes the stub, stored here, our view 
>> of the
>>     // server's exposed services.
>>     std::unique_ptr<Greeter::Stub> stub_;
>>
>>     // The producer-consumer queue we use to communicate asynchronously 
>> with the
>>     // gRPC runtime.
>>     CompletionQueue cq_;
>> };
>>
>> int main(int argc, char** argv) {
>>     // Instantiate the client. It requires a channel, out of which the 
>> actual RPCs
>>     // are created. This channel models a connection to an endpoint (in 
>> this case,
>>     // localhost at port 50051). We indicate that the channel isn't 
>> authenticated
>>     // (use of InsecureChannelCredentials()).
>>     GreeterClient greeter(grpc::CreateChannel(
>>                               "localhost:50051", 
>> grpc::InsecureChannelCredentials()));
>>
>>     // Spawn reader thread that loops indefinitely
>>     std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, 
>> &greeter);
>>
>>     std::string user("world");
>>     greeter.SayHello(user);  // The actual RPC call!
>>
>>     std::cout << "Press control-c to quit" << std::endl << std::endl;
>>     thread_.join();  //blocks forever
>>
>>     return 0;
>> }
>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/9b9275b2-f612-46ca-a8c0-e9b7b55ebb98n%40googlegroups.com.

Reply via email to