Hey thank you for sharing this. Can you please check this 
out: https://groups.google.com/g/grpc-io/c/3LMvM62SAo0

В 9:10:56 UTC+3на четвъртък, 27 април 2017 г. [email protected] написа:

> While implementing asynchronous streaming grpc, there are no straight 
> forward examples that can be used, it was hard time for me to implement it. 
> Now that I have implemented the hello world version of async stream 
> version, I thought I will share.
>
> In this example, client will request for stream of replies by initiating 
> asynchronous RPC and the server will respond with 5 replies, after server 
> is done with 5 replies it will close the RPC by calling finish and then 
> client will close the RPC.
>
> Hope it will be helpful.
>
> Thanks
> Kuldeep
>
> *Protocol Buffer:*
> // The greeting service definition.
> service Greeter {
>   // Sends a greeting
>   rpc SayHello (HelloRequest) returns (stream HelloReply) {}
> }
>
> // The request message containing the user's name.
> message HelloRequest {
>   string name = 1;
> }
>
> // The response message containing the greetings
> message HelloReply {
>   string message = 1;
> }
>
> Server Code:
> class ServerImpl final {
> public:
>     ~ServerImpl() {
>         server_->Shutdown();
>         // Always shutdown the completion queue after the server.
>         cq_->Shutdown();
>     }
>
>     // There is no shutdown handling in this code.
>     void Run() {
>         std::string server_address("0.0.0.0:50051");
>
>         ServerBuilder builder;
>         // Listen on the given address without any authentication 
> mechanism.
>         builder.AddListeningPort(server_address, 
> grpc::InsecureServerCredentials());
>         // Register "service_" as the instance through which we'll 
> communicate with
>         // clients. In this case it corresponds to an *asynchronous* 
> service.
>         builder.RegisterService(&service_);
>         // Get hold of the completion queue used for the asynchronous 
> communication
>         // with the gRPC runtime.
>         cq_ = builder.AddCompletionQueue();
>         // Finally assemble the server.
>         server_ = builder.BuildAndStart();
>         std::cout << "Server listening on " << server_address << std::endl;
>
>         // Proceed to the server's main loop.
>         HandleRpcs();
>     }
>
> private:
>     // Class encompasing the state and logic needed to serve a request.
>     class CallData {
>     public:
>         // Take in the "service" instance (in this case representing an 
> asynchronous
>         // server) and the completion queue "cq" used for asynchronous 
> communication
>         // with the gRPC runtime.
>         CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
>             : service_(service), cq_(cq), repliesSent_(0), 
> responder_(&ctx_), status_(CREATE) {
>             // Invoke the serving logic right away.
>             Proceed();
>         }
>
>         void Proceed() {
>             if (status_ == CREATE) {
>                 // Make this instance progress to the PROCESS state.
>                 status_ = PROCESS;
>                 std::cout << "Creating Call data for new client 
> connections: " << this << std::endl;
>                 // As part of the initial CREATE state, we *request* that 
> the system
>                 // start processing SayHello requests. In this request, 
> "this" acts are
>                 // the tag uniquely identifying the request (so that 
> different CallData
>                 // instances can serve different requests concurrently), 
> in this case
>                 // the memory address of this CallData instance.
>                 service_->RequestSayHello(&ctx_, &request_, &responder_, 
> cq_, cq_,
>                                           (void*) this);
>             } else if (status_ == PROCESS) {
>                 // Spawn a new CallData instance to serve new clients 
> while we process
>                 // the one for this CallData. The instance will deallocate 
> itself as
>                 // part of its FINISH state.
>                 new CallData(service_, cq_);
>
>                 // The actual processing.
>                 std::string prefix("Hello ");
>                 reply_.set_message(prefix + request_.name() +
>                     std::to_string(repliesSent_ + 1));
>                 std::cout << "Sending reponse: " << this << " : " << 
> reply_.message() << std::endl;
>                 responder_.Write(reply_, this);
>                 status_ = PROCESSING;
>                 repliesSent_++;
>
>             } else if (status_ == PROCESSING) {
>                 if (repliesSent_ == MAX_REPLIES) {
>                     // And we are done! Let the gRPC runtime know we've 
> finished, using the
>                     // memory address of this instance as the uniquely 
> identifying tag for
>                     // the event.
>                     status_ = FINISH;
>                     responder_.Finish(Status::OK, this);
>                 } else {
>                     // The actual processing.
>                     std::string prefix("Hello ");
>                     reply_.set_message(prefix + request_.name() + 
> std::to_string(repliesSent_ + 1));
>                     std::cout << "Sending reponse: " << this << " : " << 
> reply_.message() << std::endl;
>                     responder_.Write(reply_, this);
>                     status_ = PROCESSING;
>                     repliesSent_++;
>                 }
>             } else {
>                 GPR_ASSERT(status_ == FINISH);
>                 std::cout << "Completed RPC for: " << this << std::endl;
>                 // Once in the FINISH state, deallocate ourselves 
> (CallData).
>                 delete this;
>             }
>         }
>
>     private:
>         // The means of communication with the gRPC runtime for an 
> asynchronous
>         // server.
>         Greeter::AsyncService* service_;
>         // The producer-consumer queue where for asynchronous server 
> notifications.
>         ServerCompletionQueue* cq_;
>         // Context for the rpc, allowing to tweak aspects of it such as 
> the use
>         // of compression, authentication, as well as to send metadata 
> back to the
>         // client.
>         ServerContext ctx_;
>
>         // What we get from the client.
>         HelloRequest request_;
>         // What we send back to the client.
>         HelloReply reply_;
>
>         uint32_t repliesSent_;
>         const uint32_t MAX_REPLIES = 5;
>
>         // The means to get back to the client.
>         ServerAsyncWriter<HelloReply> responder_;
>
>         // Let's implement a tiny state machine with the following states.
>         enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
>         CallStatus status_;  // The current serving state.
>     };
>
>     // This can be run in multiple threads if needed.
>     void HandleRpcs() {
>         // Spawn a new CallData instance to serve new clients.
>         new CallData(&service_, cq_.get());
>         void* tag;  // uniquely identifies a request.
>         bool ok;
>         while (true) {
>             // Block waiting to read the next event from the completion 
> queue. The
>             // event is uniquely identified by its tag, which in this case 
> is the
>             // memory address of a CallData instance.
>             // The return value of Next should always be checked. This 
> return value
>             // tells us whether there is any kind of event or cq_ is 
> shutting down.
>             GPR_ASSERT(cq_->Next(&tag, &ok));
>             GPR_ASSERT(ok);
>             static_cast<CallData*>(tag)->Proceed();
>         }
>     }
>
>     std::unique_ptr<ServerCompletionQueue> cq_;
>     Greeter::AsyncService service_;
>     std::unique_ptr<Server> server_;
> };
>
> int main(int argc, char** argv) {
>     ServerImpl server;
>     server.Run();
>
>     return 0;
> }
>
>
> Client Code:
> class GreeterClient {
> public:
>     explicit GreeterClient(std::shared_ptr<Channel> channel)
>         : stub_(Greeter::NewStub(channel)) {}
>
>     // Assembles the client's payload and sends it to the server.
>     void SayHello(const std::string& user) {
>         HelloRequest request;
>         // Data we are sending to the server.
>         request.set_name(user);
>
>         // Call object to store rpc data
>         AsyncClientCall* call = new AsyncClientCall;
>
>         // stub_->AsyncSayHello() performs the RPC call, returning an 
> instance to
>         // store in "call". Because we are using the asynchronous API, we 
> need to
>         // hold on to the "call" instance in order to get updates on the 
> ongoing RPC.
>         call->response_reader = stub_->AsyncSayHello(&call->context, 
> request, &cq_, (void *)call);
>     }
>
>     // Loop while listening for completed responses.
>     // Prints out the response from the server.
>     void AsyncCompleteRpc() {
>         void* got_tag;
>         bool ok = false;
>
>         // Block until the next result is available in the completion 
> queue "cq".
>         while (cq_.Next(&got_tag, &ok)) {
>             // The tag in this example is the memory location of the call 
> object
>             ResponseHandler* responseHandler = 
> static_cast<ResponseHandler*>(got_tag);
>             std::cout << "Tag received: " << responseHandler << std::endl;
>
>             // Verify that the request was completed successfully. Note 
> that "ok"
>             // corresponds solely to the request for updates introduced by 
> Finish().
>             std::cout << "Next returned: " << ok << std::endl;
>             responseHandler->HandleResponse(ok);
>         }
>     }
>
> private:
>
>     class ResponseHandler {
>     public:
>         virtual bool HandleResponse(bool eventStatus) = 0;
>     };
>
>     // struct for keeping state and data information
>     class AsyncClientCall: public ResponseHandler {
>         enum CallStatus {CREATE, PROCESS, PROCESSED, FINISH};
>         CallStatus callStatus_;
>     public:
>
>         AsyncClientCall(): callStatus_(CREATE) {}
>
>         // Container for the data we expect from the server.
>         HelloReply reply;
>         // Context for the client. It could be used to convey extra 
> information to
>         // the server and/or tweak certain RPC behaviors.
>         ClientContext context;
>
>         // Storage for the status of the RPC upon completion.
>         Status status;
>
>         //std::unique_ptr<ClientAsyncResponseReader<HelloReply>> 
> response_reader;
>         std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> 
> response_reader;
>
>         bool HandleResponse(bool responseStatus) override {
>             switch (callStatus_) {
>             case CREATE:
>                 if (responseStatus) {
>                     response_reader->Read(&reply, (void*)this);
>                     callStatus_ = PROCESS;
>                 } else {
>                     response_reader->Finish(&status, (void*)this);
>                     callStatus_ = FINISH;
>                 }
>                 break;
>             case PROCESS:
>                 if (responseStatus) {
>                     std::cout << "Greeter received: " << this << " : " << 
> reply.message() << std::endl;
>                     response_reader->Read(&reply, (void*)this);
>                 } else {
>                     response_reader->Finish(&status, (void*)this);
>                     callStatus_ = FINISH;
>                 }
>                 break;
>             case FINISH:
>                 if (status.ok()) {
>
>                     std::cout << "Server Response Completed: " << this << 
> " CallData: " << this << std::endl;
>                 }
>                 else {
>                     std::cout << "RPC failed" << std::endl;
>                 }
>                 delete this;
>             }
>         }
>     };
>
>     // Out of the passed in Channel comes the stub, stored here, our view 
> of the
>     // server's exposed services.
>     std::unique_ptr<Greeter::Stub> stub_;
>
>     // The producer-consumer queue we use to communicate asynchronously 
> with the
>     // gRPC runtime.
>     CompletionQueue cq_;
> };
>
> int main(int argc, char** argv) {
>     // Instantiate the client. It requires a channel, out of which the 
> actual RPCs
>     // are created. This channel models a connection to an endpoint (in 
> this case,
>     // localhost at port 50051). We indicate that the channel isn't 
> authenticated
>     // (use of InsecureChannelCredentials()).
>     GreeterClient greeter(grpc::CreateChannel(
>                               "localhost:50051", 
> grpc::InsecureChannelCredentials()));
>
>     // Spawn reader thread that loops indefinitely
>     std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, 
> &greeter);
>
>     std::string user("world");
>     greeter.SayHello(user);  // The actual RPC call!
>
>     std::cout << "Press control-c to quit" << std::endl << std::endl;
>     thread_.join();  //blocks forever
>
>     return 0;
> }
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/b9550766-a225-40ef-9f5b-3230d9517b17n%40googlegroups.com.

Reply via email to