Hey thank you for sharing this. Can you please check this out: https://groups.google.com/g/grpc-io/c/3LMvM62SAo0
В 9:10:56 UTC+3на четвъртък, 27 април 2017 г. [email protected] написа: > While implementing asynchronous streaming grpc, there are no straight > forward examples that can be used, it was hard time for me to implement it. > Now that I have implemented the hello world version of async stream > version, I thought I will share. > > In this example, client will request for stream of replies by initiating > asynchronous RPC and the server will respond with 5 replies, after server > is done with 5 replies it will close the RPC by calling finish and then > client will close the RPC. > > Hope it will be helpful. > > Thanks > Kuldeep > > *Protocol Buffer:* > // The greeting service definition. > service Greeter { > // Sends a greeting > rpc SayHello (HelloRequest) returns (stream HelloReply) {} > } > > // The request message containing the user's name. > message HelloRequest { > string name = 1; > } > > // The response message containing the greetings > message HelloReply { > string message = 1; > } > > Server Code: > class ServerImpl final { > public: > ~ServerImpl() { > server_->Shutdown(); > // Always shutdown the completion queue after the server. > cq_->Shutdown(); > } > > // There is no shutdown handling in this code. > void Run() { > std::string server_address("0.0.0.0:50051"); > > ServerBuilder builder; > // Listen on the given address without any authentication > mechanism. > builder.AddListeningPort(server_address, > grpc::InsecureServerCredentials()); > // Register "service_" as the instance through which we'll > communicate with > // clients. In this case it corresponds to an *asynchronous* > service. > builder.RegisterService(&service_); > // Get hold of the completion queue used for the asynchronous > communication > // with the gRPC runtime. > cq_ = builder.AddCompletionQueue(); > // Finally assemble the server. > server_ = builder.BuildAndStart(); > std::cout << "Server listening on " << server_address << std::endl; > > // Proceed to the server's main loop. > HandleRpcs(); > } > > private: > // Class encompasing the state and logic needed to serve a request. > class CallData { > public: > // Take in the "service" instance (in this case representing an > asynchronous > // server) and the completion queue "cq" used for asynchronous > communication > // with the gRPC runtime. > CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) > : service_(service), cq_(cq), repliesSent_(0), > responder_(&ctx_), status_(CREATE) { > // Invoke the serving logic right away. > Proceed(); > } > > void Proceed() { > if (status_ == CREATE) { > // Make this instance progress to the PROCESS state. > status_ = PROCESS; > std::cout << "Creating Call data for new client > connections: " << this << std::endl; > // As part of the initial CREATE state, we *request* that > the system > // start processing SayHello requests. In this request, > "this" acts are > // the tag uniquely identifying the request (so that > different CallData > // instances can serve different requests concurrently), > in this case > // the memory address of this CallData instance. > service_->RequestSayHello(&ctx_, &request_, &responder_, > cq_, cq_, > (void*) this); > } else if (status_ == PROCESS) { > // Spawn a new CallData instance to serve new clients > while we process > // the one for this CallData. The instance will deallocate > itself as > // part of its FINISH state. > new CallData(service_, cq_); > > // The actual processing. > std::string prefix("Hello "); > reply_.set_message(prefix + request_.name() + > std::to_string(repliesSent_ + 1)); > std::cout << "Sending reponse: " << this << " : " << > reply_.message() << std::endl; > responder_.Write(reply_, this); > status_ = PROCESSING; > repliesSent_++; > > } else if (status_ == PROCESSING) { > if (repliesSent_ == MAX_REPLIES) { > // And we are done! Let the gRPC runtime know we've > finished, using the > // memory address of this instance as the uniquely > identifying tag for > // the event. > status_ = FINISH; > responder_.Finish(Status::OK, this); > } else { > // The actual processing. > std::string prefix("Hello "); > reply_.set_message(prefix + request_.name() + > std::to_string(repliesSent_ + 1)); > std::cout << "Sending reponse: " << this << " : " << > reply_.message() << std::endl; > responder_.Write(reply_, this); > status_ = PROCESSING; > repliesSent_++; > } > } else { > GPR_ASSERT(status_ == FINISH); > std::cout << "Completed RPC for: " << this << std::endl; > // Once in the FINISH state, deallocate ourselves > (CallData). > delete this; > } > } > > private: > // The means of communication with the gRPC runtime for an > asynchronous > // server. > Greeter::AsyncService* service_; > // The producer-consumer queue where for asynchronous server > notifications. > ServerCompletionQueue* cq_; > // Context for the rpc, allowing to tweak aspects of it such as > the use > // of compression, authentication, as well as to send metadata > back to the > // client. > ServerContext ctx_; > > // What we get from the client. > HelloRequest request_; > // What we send back to the client. > HelloReply reply_; > > uint32_t repliesSent_; > const uint32_t MAX_REPLIES = 5; > > // The means to get back to the client. > ServerAsyncWriter<HelloReply> responder_; > > // Let's implement a tiny state machine with the following states. > enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH }; > CallStatus status_; // The current serving state. > }; > > // This can be run in multiple threads if needed. > void HandleRpcs() { > // Spawn a new CallData instance to serve new clients. > new CallData(&service_, cq_.get()); > void* tag; // uniquely identifies a request. > bool ok; > while (true) { > // Block waiting to read the next event from the completion > queue. The > // event is uniquely identified by its tag, which in this case > is the > // memory address of a CallData instance. > // The return value of Next should always be checked. This > return value > // tells us whether there is any kind of event or cq_ is > shutting down. > GPR_ASSERT(cq_->Next(&tag, &ok)); > GPR_ASSERT(ok); > static_cast<CallData*>(tag)->Proceed(); > } > } > > std::unique_ptr<ServerCompletionQueue> cq_; > Greeter::AsyncService service_; > std::unique_ptr<Server> server_; > }; > > int main(int argc, char** argv) { > ServerImpl server; > server.Run(); > > return 0; > } > > > Client Code: > class GreeterClient { > public: > explicit GreeterClient(std::shared_ptr<Channel> channel) > : stub_(Greeter::NewStub(channel)) {} > > // Assembles the client's payload and sends it to the server. > void SayHello(const std::string& user) { > HelloRequest request; > // Data we are sending to the server. > request.set_name(user); > > // Call object to store rpc data > AsyncClientCall* call = new AsyncClientCall; > > // stub_->AsyncSayHello() performs the RPC call, returning an > instance to > // store in "call". Because we are using the asynchronous API, we > need to > // hold on to the "call" instance in order to get updates on the > ongoing RPC. > call->response_reader = stub_->AsyncSayHello(&call->context, > request, &cq_, (void *)call); > } > > // Loop while listening for completed responses. > // Prints out the response from the server. > void AsyncCompleteRpc() { > void* got_tag; > bool ok = false; > > // Block until the next result is available in the completion > queue "cq". > while (cq_.Next(&got_tag, &ok)) { > // The tag in this example is the memory location of the call > object > ResponseHandler* responseHandler = > static_cast<ResponseHandler*>(got_tag); > std::cout << "Tag received: " << responseHandler << std::endl; > > // Verify that the request was completed successfully. Note > that "ok" > // corresponds solely to the request for updates introduced by > Finish(). > std::cout << "Next returned: " << ok << std::endl; > responseHandler->HandleResponse(ok); > } > } > > private: > > class ResponseHandler { > public: > virtual bool HandleResponse(bool eventStatus) = 0; > }; > > // struct for keeping state and data information > class AsyncClientCall: public ResponseHandler { > enum CallStatus {CREATE, PROCESS, PROCESSED, FINISH}; > CallStatus callStatus_; > public: > > AsyncClientCall(): callStatus_(CREATE) {} > > // Container for the data we expect from the server. > HelloReply reply; > // Context for the client. It could be used to convey extra > information to > // the server and/or tweak certain RPC behaviors. > ClientContext context; > > // Storage for the status of the RPC upon completion. > Status status; > > //std::unique_ptr<ClientAsyncResponseReader<HelloReply>> > response_reader; > std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> > response_reader; > > bool HandleResponse(bool responseStatus) override { > switch (callStatus_) { > case CREATE: > if (responseStatus) { > response_reader->Read(&reply, (void*)this); > callStatus_ = PROCESS; > } else { > response_reader->Finish(&status, (void*)this); > callStatus_ = FINISH; > } > break; > case PROCESS: > if (responseStatus) { > std::cout << "Greeter received: " << this << " : " << > reply.message() << std::endl; > response_reader->Read(&reply, (void*)this); > } else { > response_reader->Finish(&status, (void*)this); > callStatus_ = FINISH; > } > break; > case FINISH: > if (status.ok()) { > > std::cout << "Server Response Completed: " << this << > " CallData: " << this << std::endl; > } > else { > std::cout << "RPC failed" << std::endl; > } > delete this; > } > } > }; > > // Out of the passed in Channel comes the stub, stored here, our view > of the > // server's exposed services. > std::unique_ptr<Greeter::Stub> stub_; > > // The producer-consumer queue we use to communicate asynchronously > with the > // gRPC runtime. > CompletionQueue cq_; > }; > > int main(int argc, char** argv) { > // Instantiate the client. It requires a channel, out of which the > actual RPCs > // are created. This channel models a connection to an endpoint (in > this case, > // localhost at port 50051). We indicate that the channel isn't > authenticated > // (use of InsecureChannelCredentials()). > GreeterClient greeter(grpc::CreateChannel( > "localhost:50051", > grpc::InsecureChannelCredentials())); > > // Spawn reader thread that loops indefinitely > std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, > &greeter); > > std::string user("world"); > greeter.SayHello(user); // The actual RPC call! > > std::cout << "Press control-c to quit" << std::endl << std::endl; > thread_.join(); //blocks forever > > return 0; > } > > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/b9550766-a225-40ef-9f5b-3230d9517b17n%40googlegroups.com.
