I have figured out the issue and posted complete working on in a new 
thread, the issue what I was setting up read request even when read event 
returned false, it indicates end of read and I need to set event for finish.

https://groups.google.com/forum/#!topic/grpc-io/2wyoDZT5eao




On Wednesday, 26 April 2017 22:57:49 UTC+5:30, Kuldeep Melligeri wrote:
>
> Thanks William for reply,
>
> I need your help debugging the issue which I am facing with this 
> implementation. 
>
> I extremely apologies as I am going to write more details of what I am 
> facing so that you will get the complete context. This is a C++ 
> implementation.
>
> I have tweaked, greeter hello example to perform async streaming 
> operation. What I have done is to initiate a stream request from client and 
> wait on read/finish on the client side. On server side, when it gets 
> request, it will start sending the stream of replies, it will send 5 
> replies by calling write operation and finally it will call "finish" 
> operation. 
>
> On client side, I have registered the call data pointer as tag to perform 
> read operation and finish operation. As I expect to perform multiple reads, 
> each time the read event is triggered, I will print the response message 
> and again call Read to get read to receive next read event (The last Read 
> call cause a stale pointer in completion queue, I will explain in the next 
> line).
>
> On server side, after it sends the 5th response, it will call Finish, on 
> client side, the finish tag gets called, where I will say RPC is completed 
> and delete the object, but the problem here is that last Read is registered 
> with stale pointer and this is causing core dump, with message call to pure 
> virtual function.
>
>
> Here is my code:
> *Protocol Buffer:*
> // The greeting service definition.
> service Greeter {
>   // Sends a greeting
>   rpc SayHello (HelloRequest) returns (stream HelloReply) {}
> }
>
> // The request message containing the user's name.
> message HelloRequest {
>   string name = 1;
> }
>
> // The response message containing the greetings
> message HelloReply {
>   string message = 1;
> }
>
> *Client Side code:*
> class GreeterClient {
>   public:
>     explicit GreeterClient(std::shared_ptr<Channel> channel)
>             : stub_(Greeter::NewStub(channel)) {}
>
>     // Assembles the client's payload and sends it to the server.
>     void SayHello(const std::string& user) {
>         HelloRequest request;
>         // Data we are sending to the server.
>         request.set_name(user);
>
>         // Call object to store rpc data
>         AsyncClientCall* call = new AsyncClientCall;
>         AsyncClientCallCloser* callCloser = new 
> AsyncClientCallCloser(call);
>
>         // stub_->AsyncSayHello() performs the RPC call, returning an 
> instance to
>         // store in "call". Because we are using the asynchronous API, we 
> need to
>         // hold on to the "call" instance in order to get updates on the 
> ongoing RPC.
>         call->response_reader = stub_->AsyncSayHello(&call->context, 
> request, &cq_, (void *)call);
>
>         // Request that, upon completion of the RPC, "reply" be updated 
> with the
>         // server's response; "status" with the indication of whether the 
> operation
>         // was successful. Tag the request with the memory address of the 
> call object.
>         call->response_reader->Read(&call->reply, (void*)call);
>
>         // Request that, upon completion of the RPC, "reply" be updated 
> with the
>         // server's response; "status" with the indication of whether the 
> operation
>         // was successful. Tag the request with the memory address of the 
> call object.
>         call->response_reader->Finish(&callCloser->status, 
> (void*)callCloser);   /// When I get finish, I will delete callClosure and 
> also the call object.
>     }
>
>     // Loop while listening for completed responses.
>     // Prints out the response from the server.
>     void AsyncCompleteRpc() {
>         void* got_tag;
>         bool ok = false;
>
>         // Block until the next result is available in the completion 
> queue "cq".
>         while (cq_.Next(&got_tag, &ok)) {
>             // The tag in this example is the memory location of the call 
> object
>             ResponseHandler* responseHandler = 
> static_cast<ResponseHandler*>(got_tag);
>
>             // Verify that the request was completed successfully. Note 
> that "ok"
>             // corresponds solely to the request for updates introduced by 
> Finish().
>             GPR_ASSERT(ok);
>             if (responseHandler->RequestSent()) { 
>                 responseHandler->HandleResponse();
>             } else {
>                 std::cout << "Hello Request sent successfully" << 
> std::endl;
>                 responseHandler->SetRequestSent();
>             }
>         }
>     }
>
>   private:
>
>     class ResponseHandler {
>     public:
>         virtual bool HandleResponse() = 0;
>         virtual bool RequestSent() {return true;} ;
>         virtual void SetRequestSent() {}; 
>     };
>
>     // struct for keeping state and data information
>     class AsyncClientCall: public ResponseHandler {
>         // Request sent event
>         bool requestSent;
>     public:
>
>         AsyncClientCall(): requestSent(false) {}
>
>         // Container for the data we expect from the server.
>         HelloReply reply;
>
>         // Context for the client. It could be used to convey extra 
> information to
>         // the server and/or tweak certain RPC behaviors.
>         ClientContext context;
>
>         // Storage for the status of the RPC upon completion.
>         Status status;
>
>         //std::unique_ptr<ClientAsyncResponseReader<HelloReply>> 
> response_reader;
>         std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> 
> response_reader;
>
>         bool HandleResponse() override {
>             std::cout << "Greeter received: " << reply.message() << 
> std::endl;
>             *response_reader->Read(&reply, (void*)this);  *               
>              // Here I read the response and call Read API to get event 
> on next Read, but the last Read registration will remain as stale entry,the 
> object gets deleted when server sends Finish, then I get crash on 
> processing of completion queue
>             
>         } 
>
>         bool RequestSent() override {
>             return requestSent;
>         }
>
>         void SetRequestSent() override {
>             requestSent = true;
>         }
>     };
>
>     // struct for keeping state and data information
>     class AsyncClientCallCloser: public ResponseHandler {
>     private:
>         AsyncClientCall *m_call;
>     public:
>         AsyncClientCallCloser(AsyncClientCall *call): m_call(call) {}
>
>         // Storage for the status of the RPC upon completion.
>         Status status;
>
>         bool HandleResponse() override {
>             if (status.ok()) {
>                 std::cout << "Server Response Completed" << std::endl;
>             }
>             else {
>                 std::cout << "RPC failed" << std::endl;
>             }
>             delete m_call;
>             delete this;
>         } 
>     };
>
>       
>     // Out of the passed in Channel comes the stub, stored here, our view 
> of the
>     // server's exposed services.
>     std::unique_ptr<Greeter::Stub> stub_;
>
>     // The producer-consumer queue we use to communicate asynchronously 
> with the
>     // gRPC runtime.
>     CompletionQueue cq_;
> };
>
> int main(int argc, char** argv) {
>
>
>     // Instantiate the client. It requires a channel, out of which the 
> actual RPCs
>     // are created. This channel models a connection to an endpoint (in 
> this case,
>     // localhost at port 50051). We indicate that the channel isn't 
> authenticated
>     // (use of InsecureChannelCredentials()).
>     GreeterClient greeter(grpc::CreateChannel(
>             "localhost:50051", grpc::InsecureChannelCredentials()));
>
>     // Spawn reader thread that loops indefinitely
>     std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, 
> &greeter);
>
>     std::string user("world");
>     greeter.SayHello(user);  // The actual RPC call!
>
>     std::cout << "Press control-c to quit" << std::endl << std::endl;
>     thread_.join();  //blocks forever
>
>     return 0;
> }
>
> *Server Side Code:*
> class ServerImpl final {
> public:
>     ~ServerImpl() {
>         server_->Shutdown();
>         // Always shutdown the completion queue after the server.
>         cq_->Shutdown();
>     }
>
>     // There is no shutdown handling in this code.
>     void Run() {
>         std::string server_address("0.0.0.0:50051");
>
>         ServerBuilder builder;
>         // Listen on the given address without any authentication 
> mechanism.
>         builder.AddListeningPort(server_address, 
> grpc::InsecureServerCredentials());
>         // Register "service_" as the instance through which we'll 
> communicate with
>         // clients. In this case it corresponds to an *asynchronous* 
> service.
>         builder.RegisterService(&service_);
>         // Get hold of the completion queue used for the asynchronous 
> communication
>         // with the gRPC runtime.
>         cq_ = builder.AddCompletionQueue();
>         // Finally assemble the server.
>         server_ = builder.BuildAndStart();
>         std::cout << "Server listening on " << server_address << std::endl;
>
>         // Proceed to the server's main loop.
>         HandleRpcs();
>     }
>
> private:
>     // Class encompasing the state and logic needed to serve a request.
>     class CallData {
>     public:
>         // Take in the "service" instance (in this case representing an 
> asynchronous
>         // server) and the completion queue "cq" used for asynchronous 
> communication
>         // with the gRPC runtime.
>         CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
>             : service_(service), cq_(cq), repliesSent_(0), 
> responder_(&ctx_), status_(CREATE) {
>             // Invoke the serving logic right away.
>             Proceed();
>         }
>
>         void Proceed() {
>             if (status_ == CREATE) {
>                 // Make this instance progress to the PROCESS state.
>                 status_ = PROCESS;
>                 std::cout << "Creating Call data for new client 
> connections: " << this << std::endl;
>                 // As part of the initial CREATE state, we *request* that 
> the system
>                 // start processing SayHello requests. In this request, 
> "this" acts are
>                 // the tag uniquely identifying the request (so that 
> different CallData
>                 // instances can serve different requests concurrently), 
> in this case
>                 // the memory address of this CallData instance.
>                 service_->RequestSayHello(&ctx_, &request_, &responder_, 
> cq_, cq_,
>                                           (void*) this);
>             } else if (status_ == PROCESS) {
>                 // Spawn a new CallData instance to serve new clients 
> while we process
>                 // the one for this CallData. The instance will deallocate 
> itself as
>                 // part of its FINISH state.
>                 new CallData(service_, cq_);
>
>                 // The actual processing.
>                 std::string prefix("Hello ");
>                 reply_.set_message(prefix + request_.name() +
>                     std::to_string(repliesSent_ + 1));
>                 std::cout << "Sending reponse: " << this << " : " << 
> reply_.message() << std::endl;
>                 responder_.Write(reply_, this);
>                 status_ = PROCESSING;
>                 repliesSent_++;
>
>             } else if (status_ == PROCESSING) {
>                 if (repliesSent_ == MAX_REPLIES) {
>                     // And we are done! Let the gRPC runtime know we've 
> finished, using the
>                     // memory address of this instance as the uniquely 
> identifying tag for
>                     // the event.
>                     status_ = FINISH;
>                     responder_.Finish(Status::OK, this);
>                 } else {
>                     // The actual processing.
>                     std::string prefix("Hello ");
>                     reply_.set_message(prefix + request_.name() + 
> std::to_string(repliesSent_ + 1));
>                     std::cout << "Sending reponse: " << this << " : " << 
> reply_.message() << std::endl;
>                     responder_.Write(reply_, this);
>                     status_ = PROCESSING;
>                     repliesSent_++;
>                 }
>             } else {
>                 GPR_ASSERT(status_ == FINISH);
>                 std::cout << "Completed RPC for: " << this << std::endl;
>                 // Once in the FINISH state, deallocate ourselves 
> (CallData).
>                 delete this;
>             }
>         }
>
>     private:
>         // The means of communication with the gRPC runtime for an 
> asynchronous
>         // server.
>         Greeter::AsyncService* service_;
>         // The producer-consumer queue where for asynchronous server 
> notifications.
>         ServerCompletionQueue* cq_;
>         // Context for the rpc, allowing to tweak aspects of it such as 
> the use
>         // of compression, authentication, as well as to send metadata 
> back to the
>         // client.
>         ServerContext ctx_;
>
>         // What we get from the client.
>         HelloRequest request_;
>         // What we send back to the client.
>         HelloReply reply_;
>     
>         uint32_t repliesSent_;
>         const uint32_t MAX_REPLIES = 5;
>
>         // The means to get back to the client.
>         ServerAsyncWriter<HelloReply> responder_;
>
>         // Let's implement a tiny state machine with the following states.
>         enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
>         CallStatus status_;  // The current serving state.
>     };
>
>     // This can be run in multiple threads if needed.
>     void HandleRpcs() {
>         // Spawn a new CallData instance to serve new clients.
>         new CallData(&service_, cq_.get());
>         void* tag;  // uniquely identifies a request.
>         bool ok;
>         while (true) {
>             // Block waiting to read the next event from the completion 
> queue. The
>             // event is uniquely identified by its tag, which in this case 
> is the
>             // memory address of a CallData instance.
>             // The return value of Next should always be checked. This 
> return value
>             // tells us whether there is any kind of event or cq_ is 
> shutting down.
>             GPR_ASSERT(cq_->Next(&tag, &ok));
>             GPR_ASSERT(ok);
>             static_cast<CallData*>(tag)->Proceed();
>         }
>     }
>
>     std::unique_ptr<ServerCompletionQueue> cq_;
>     Greeter::AsyncService service_;
>     std::unique_ptr<Server> server_;
> };
>
> int main(int argc, char** argv) {
>     ServerImpl server;
>     server.Run();
>
>     return 0;
> }
>
> *Output:*
> *Server Side:*
> kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$ 
> ./greeter_async_server 
> Server listening on 0.0.0.0:50051
> Creating Call data for new client connections: 0xb0b5e0
> Creating Call data for new client connections: 0xb07970
> Sending reponse: 0xb0b5e0 : Hello world1
> Sending reponse: 0xb0b5e0 : Hello world2
> Sending reponse: 0xb0b5e0 : Hello world3
> Sending reponse: 0xb0b5e0 : Hello world4
> Sending reponse: 0xb0b5e0 : Hello world5
> Completed RPC for: 0xb0b5e0
>
> Client Side:
> kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$ 
> ./greeter_async_client2 
> Press control-c to quit
>
> Hello Request sent successfully
> Greeter received: Hello world1
> Greeter received: Hello world2
> Greeter received: Hello world3
> Greeter received: Hello world4
> Greeter received: Hello world5
> Server Response Completed
> pure virtual method called
> terminate called without an active exception
> *Aborted (core dumped)*
>
>
> (gdb) bt
> #0  0x00007ffff6b4bc37 in __GI_raise (sig=sig@entry=6)
>     at ../nptl/sysdeps/unix/sysv/linux/raise.c:56
> #1  0x00007ffff6b4f028 in __GI_abort () at abort.c:89
> #2  0x00007ffff7153535 in __gnu_cxx::__verbose_terminate_handler() ()
>    from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
> #3  0x00007ffff71516d6 in ?? () from 
> /usr/lib/x86_64-linux-gnu/libstdc++.so.6
> #4  0x00007ffff7151703 in std::terminate() ()
>    from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
> *#5  0x00007ffff71521bf in __cxa_pure_virtual ()*
>    from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
> #6  0x00007ffff7bb6e83 in grpc::CompletionQueue::AsyncNextInternal(void**, 
> bool*, gpr_timespec) () from /usr/local/lib/libgrpc++.so.3
> #7  0x000000000041368f in grpc::CompletionQueue::Next 
> (this=0x7fffffffde28, 
>     tag=0x7ffff63e7d40, ok=0x7ffff63e7d3f)
>     at /usr/local/include/grpc++/impl/codegen/completion_queue.h:153
> #8  0x0000000000413b83 in GreeterClient::AsyncCompleteRpc 
> (this=0x7fffffffde20)
>     at greeter_async_client2.cc:92
> #9  0x0000000000415b0b in std::_Mem_fn<void 
> (GreeterClient::*)()>::operator()<, void>(GreeterClient*) const 
> (this=0x6258f8, __object=0x7fffffffde20)
>     at /usr/include/c++/4.8/functional:601
> #10 0x0000000000415a5b in std::_Bind_simple<std::_Mem_fn<void 
> (GreeterClient::*)()> 
> (GreeterClient*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x6258f0)
>     at /usr/include/c++/4.8/functional:1732
> #11 0x0000000000415963 in std::_Bind_simple<std::_Mem_fn<void 
> (GreeterClient::*)()> (GreeterClient*)>::operator()() (this=0x6258f0)
>     at /usr/include/c++/4.8/functional:1720
> #12 0x00000000004158fc in 
> std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void 
> (GreeterClient::*)()> (GreeterClient*)> >::_M_run() (this=0x6258d8)
>     at /usr/include/c++/4.8/thread:115
> #13 0x00007ffff71a4a60 in ?? () from 
> /usr/lib/x86_64-linux-gnu/libstdc++.so.6
> #14 0x00007ffff63f1184 in start_thread (arg=0x7ffff63e8700)
>     at pthread_create.c:312
> #15 0x00007ffff6c12bed in clone ()
>     at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111
>
>
>
>
>
>
>
>
> On Wednesday, 26 April 2017 19:39:19 UTC+5:30, William Thurston wrote:
>>
>> This is how I write similar APIs, with one caveat. I typically complete 
>> or close with CANCEL the connections randomly somewhere between 15-30 
>> minutes after initial request to force my clients to handle reconnection. 
>> It also helps quite a bit with load balancing and deployments of new server 
>> code to know your clients are already coded to handle disconnections as a 
>> common occurrence. 
>>
>> William Thurston
>>
>> On Apr 26, 2017, at 12:44 AM, Kuldeep Melligeri <[email protected]> 
>> wrote:
>>
>> Hi,
>>
>> I am implementing a service where client is suppose to subscribe for some 
>> information on server. I am planning to implement with async streaming 
>> APIs. 
>> On server when it gets client request for the first time, the server 
>> starting writing back to client as and when the data is available from 
>> different source. 
>> To keep the RPC alive, I am only calling "write" whenever the data is 
>> available. I am planning to call finish only when server shuts down.
>>
>> Is this is the general practice for subscription based service? Or is 
>> there any better way to do this job?
>>
>> Thanks
>> Kuldeep
>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "grpc.io" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at https://groups.google.com/group/grpc-io.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/grpc-io/dc37f9f3-7b7b-4f3c-b423-e628d68c8074%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/grpc-io/dc37f9f3-7b7b-4f3c-b423-e628d68c8074%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>> For more options, visit https://groups.google.com/d/optout.
>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/872b8f80-b71b-4eeb-b1eb-cc37f8b825da%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to