Hello all,

Sorry for the long post. I am trying to figure out how to use grpc asio 
streaming and I am coming up short. I put together the below client/server from 
code I got from the helloworld example plus an example I saw on the internet. 
It works, but the server only serves a single client. The second client blocks 
forever, waiting to get a response from the server. The first client works just 
fine.  Can someone provide me with some insight into how to fix this so my asio 
server can process multiple clients? Thanks.


PROTO
--------------------------------------------------

syntax = "proto3";
 
package asio_server;
 
// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
 
// The request message containing the user's name.
  message HelloRequest {
  string name = 1;
}
 
// The response message containing the greetings
message HelloReply {
  string message = 1;
}
 
 
 
CLIENT
--------------------------------------------------
 
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
 
#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "asio_server.grpc.pb.h"
 
using namespace grpc;
using namespace asio_server;
 
 
class GreeterClient {
public:
  GreeterClient(std::shared_ptr<Channel> channel)
      : stub_(Greeter::NewStub(channel)) {}
 
  std::string SayHello(const std::string& user) {
    HelloRequest request;
    request.set_name(user);
    HelloReply reply;
    ClientContext context;
 
    std::unique_ptr<ClientReader<HelloReply> > reader(
        stub_->SayHello(&context, request));
 
    while (reader->Read(&reply)) {
      std::cout << "Rcvd reply: " << reply.message() <<std::endl;
    }
 
    return std::string("done");
  }
 
private:
  std::unique_ptr<Greeter::Stub> stub_;
};
 
int main(int argc, char** argv) {
  GreeterClient greeter(grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials()));
  std::string user("world");
  std::string reply = greeter.SayHello(user);
  std::cout << "Greeter received: " << reply << std::endl;
 
  return 0;
}
 
 
 
SERVER
--------------------------------------------------
#include <algorithm>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <chrono>
 
#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
#include "asio_server.pb.h"
#include "asio_server.grpc.pb.h"
#include <grpcpp/grpcpp.h>
 
 
using namespace grpc;
using namespace asio_server;
 
 
class ServerImpl final {
public:
    ~ServerImpl() {
        server_->Shutdown();
        // Always shutdown the completion queue after the server.
        cq_->Shutdown();
    }
 
    // There is no shutdown handling in this code.
    void Run() {
        std::string server_address("0.0.0.0:50051");
 
        ServerBuilder builder;
        // Listen on the given address without any authentication mechanism.
        builder.AddListeningPort(server_address, 
grpc::InsecureServerCredentials());
        // Register "service_" as the instance through which we'll communicate 
with
        // clients. In this case it corresponds to an *asynchronous* service.
        builder.RegisterService(&service_);
        // Get hold of the completion queue used for the asynchronous 
communication
        // with the gRPC runtime.
        cq_ = builder.AddCompletionQueue();
        // Finally assemble the server.
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;
 
        // Proceed to the server's main loop.
        HandleRpcs();
    }
 
private:
    // Class encompasing the state and logic needed to serve a request.
    class CallData {
    public:
        // Take in the "service" instance (in this case representing an 
asynchronous
        // server) and the completion queue "cq" used for asynchronous 
communication
        // with the gRPC runtime.
        CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
            : service_(service), cq_(cq), repliesSent_(0), responder_(&ctx_), 
status_(CREATE) {
            // Invoke the serving logic right away.
            Proceed();
        }
 
        void Proceed() {
            if (status_ == CREATE) {
                // Make this instance progress to the PROCESS state.
                status_ = PROCESS;
                std::cout << "Creating Call data for new client connections: " 
<< this << std::endl;
                // As part of the initial CREATE state, we *request* that the 
system
                // start processing SayHello requests. In this request, "this" 
acts are
                // the tag uniquely identifying the request (so that different 
CallData
                // instances can serve different requests concurrently), in 
this case
                // the memory address of this CallData instance.
                service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, 
cq_,
                                          (void*) this);
            } else if (status_ == PROCESS) {
                // Spawn a new CallData instance to serve new clients while we 
process
                // the one for this CallData. The instance will deallocate 
itself as
                // part of its FINISH state.
                new CallData(service_, cq_);
 
                // The actual processing.
                std::string prefix("Hello ");
                reply_.set_message(prefix + request_.name() +
                    std::to_string(repliesSent_ + 1));
                std::cout << "Sending reponse: " << this << " : " << 
reply_.message() << std::endl;
                responder_.Write(reply_, this);
                status_ = PROCESSING;
                repliesSent_++;
 
            } else if (status_ == PROCESSING) {
                if (repliesSent_ == MAX_REPLIES) {
                    // And we are done! Let the gRPC runtime know we've 
finished, using the
                    // memory address of this instance as the uniquely 
identifying tag for
                    // the event.
                    status_ = FINISH;
                    responder_.Finish(Status::OK, this);
                } else {
                    // The actual processing.
                    std::string prefix("Hello ");
                    reply_.set_message(prefix + request_.name() + 
std::to_string(repliesSent_ + 1));
                    std::cout << "Sending reponse: " << this << " : " << 
reply_.message() << std::endl;
                    responder_.Write(reply_, this);
                    
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
                    status_ = PROCESSING;
                    repliesSent_++;
                }
            } else {
                GPR_ASSERT(status_ == FINISH);
                std::cout << "Completed RPC for: " << this << std::endl;
                // Once in the FINISH state, deallocate ourselves (CallData).
                delete this;
            }
        }
 
    private:
        // The means of communication with the gRPC runtime for an asynchronous
        // server.
        Greeter::AsyncService* service_;
        // The producer-consumer queue where for asynchronous server 
notifications.
        ServerBuilder* builder_;
       
        ServerCompletionQueue* cq_;
       
        // Context for the rpc, allowing to tweak aspects of it such as the use
        // of compression, authentication, as well as to send metadata back to 
the
        // client.
        ServerContext ctx_;
 
        // What we get from the client.
        HelloRequest request_;
        // What we send back to the client.
        HelloReply reply_;
 
        uint32_t repliesSent_;
        const uint32_t MAX_REPLIES = 1000000000;
 
        // The means to get back to the client.
        ServerAsyncWriter<HelloReply> responder_;
 
        // Let's implement a tiny state machine with the following states.
        enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
        CallStatus status_;  // The current serving state.
    };
 
    // This can be run in multiple threads if needed.
    void HandleRpcs() {
        // Spawn a new CallData instance to serve new clients.
        new CallData(&service_, cq_.get());
        void* tag;  // uniquely identifies a request.
        bool ok;
        while (true) {
            // Block waiting to read the next event from the completion queue. 
The
            // event is uniquely identified by its tag, which in this case is 
the
            // memory address of a CallData instance.
            // The return value of Next should always be checked. This return 
value
            // tells us whether there is any kind of event or cq_ is shutting 
down.
            GPR_ASSERT(cq_->Next(&tag, &ok));
            GPR_ASSERT(ok);
            static_cast<CallData*>(tag)->Proceed();
        }
    }
 
    std::unique_ptr<ServerCompletionQueue> cq_;
    Greeter::AsyncService service_;
    std::unique_ptr<Server> server_;
};
 
int main(int argc, char** argv) {
    ServerImpl server;
    server.Run();
 
    return 0;
}
 
 
OUTPUT
--------------------------------------------------
Server terminal:
~/grpc/examples/cpp/asio$ ./asio_server
Server listening on 0.0.0.0:50051
Creating Call data for new client connections: 0x8a25e0
Creating Call data for new client connections: 0x8aaff0
Sending reponse: 0x8a25e0 : Hello world1
Sending reponse: 0x8a25e0 : Hello world2
Sending reponse: 0x8a25e0 : Hello world3
Sending reponse: 0x8a25e0 : Hello world4
Sending reponse: 0x8a25e0 : Hello world5
Sending reponse: 0x8a25e0 : Hello world6
Sending reponse: 0x8a25e0 : Hello world7
 
Client 1 Terminal:
~/grpc/examples/cpp/asio$ ./asio_client
Rcvd reply: Hello world1
Rcvd reply: Hello world2
Rcvd reply: Hello world3
Rcvd reply: Hello world4
Rcvd reply: Hello world5
Rcvd reply: Hello world6
Rcvd reply: Hello world7
 
Client 2 Terminal:
 
~/grpc/examples/cpp/asio$ ./asio_client
 
 
 // code blocks forever in while(reader->read(&reply))
 
 
 

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/779fd2da-2f59-4fc0-a4f2-adfed1745b27%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to