I have figured out the issue and posted complete working on in a new thread, the issue what I was setting up read request even when read event returned false, it indicates end of read and I need to set event for finish.
https://groups.google.com/forum/#!topic/grpc-io/2wyoDZT5eao On Wednesday, 26 April 2017 22:57:49 UTC+5:30, Kuldeep Melligeri wrote: > > Thanks William for reply, > > I need your help debugging the issue which I am facing with this > implementation. > > I extremely apologies as I am going to write more details of what I am > facing so that you will get the complete context. This is a C++ > implementation. > > I have tweaked, greeter hello example to perform async streaming > operation. What I have done is to initiate a stream request from client and > wait on read/finish on the client side. On server side, when it gets > request, it will start sending the stream of replies, it will send 5 > replies by calling write operation and finally it will call "finish" > operation. > > On client side, I have registered the call data pointer as tag to perform > read operation and finish operation. As I expect to perform multiple reads, > each time the read event is triggered, I will print the response message > and again call Read to get read to receive next read event (The last Read > call cause a stale pointer in completion queue, I will explain in the next > line). > > On server side, after it sends the 5th response, it will call Finish, on > client side, the finish tag gets called, where I will say RPC is completed > and delete the object, but the problem here is that last Read is registered > with stale pointer and this is causing core dump, with message call to pure > virtual function. > > > Here is my code: > *Protocol Buffer:* > // The greeting service definition. > service Greeter { > // Sends a greeting > rpc SayHello (HelloRequest) returns (stream HelloReply) {} > } > > // The request message containing the user's name. > message HelloRequest { > string name = 1; > } > > // The response message containing the greetings > message HelloReply { > string message = 1; > } > > *Client Side code:* > class GreeterClient { > public: > explicit GreeterClient(std::shared_ptr<Channel> channel) > : stub_(Greeter::NewStub(channel)) {} > > // Assembles the client's payload and sends it to the server. > void SayHello(const std::string& user) { > HelloRequest request; > // Data we are sending to the server. > request.set_name(user); > > // Call object to store rpc data > AsyncClientCall* call = new AsyncClientCall; > AsyncClientCallCloser* callCloser = new > AsyncClientCallCloser(call); > > // stub_->AsyncSayHello() performs the RPC call, returning an > instance to > // store in "call". Because we are using the asynchronous API, we > need to > // hold on to the "call" instance in order to get updates on the > ongoing RPC. > call->response_reader = stub_->AsyncSayHello(&call->context, > request, &cq_, (void *)call); > > // Request that, upon completion of the RPC, "reply" be updated > with the > // server's response; "status" with the indication of whether the > operation > // was successful. Tag the request with the memory address of the > call object. > call->response_reader->Read(&call->reply, (void*)call); > > // Request that, upon completion of the RPC, "reply" be updated > with the > // server's response; "status" with the indication of whether the > operation > // was successful. Tag the request with the memory address of the > call object. > call->response_reader->Finish(&callCloser->status, > (void*)callCloser); /// When I get finish, I will delete callClosure and > also the call object. > } > > // Loop while listening for completed responses. > // Prints out the response from the server. > void AsyncCompleteRpc() { > void* got_tag; > bool ok = false; > > // Block until the next result is available in the completion > queue "cq". > while (cq_.Next(&got_tag, &ok)) { > // The tag in this example is the memory location of the call > object > ResponseHandler* responseHandler = > static_cast<ResponseHandler*>(got_tag); > > // Verify that the request was completed successfully. Note > that "ok" > // corresponds solely to the request for updates introduced by > Finish(). > GPR_ASSERT(ok); > if (responseHandler->RequestSent()) { > responseHandler->HandleResponse(); > } else { > std::cout << "Hello Request sent successfully" << > std::endl; > responseHandler->SetRequestSent(); > } > } > } > > private: > > class ResponseHandler { > public: > virtual bool HandleResponse() = 0; > virtual bool RequestSent() {return true;} ; > virtual void SetRequestSent() {}; > }; > > // struct for keeping state and data information > class AsyncClientCall: public ResponseHandler { > // Request sent event > bool requestSent; > public: > > AsyncClientCall(): requestSent(false) {} > > // Container for the data we expect from the server. > HelloReply reply; > > // Context for the client. It could be used to convey extra > information to > // the server and/or tweak certain RPC behaviors. > ClientContext context; > > // Storage for the status of the RPC upon completion. > Status status; > > //std::unique_ptr<ClientAsyncResponseReader<HelloReply>> > response_reader; > std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> > response_reader; > > bool HandleResponse() override { > std::cout << "Greeter received: " << reply.message() << > std::endl; > *response_reader->Read(&reply, (void*)this); * > // Here I read the response and call Read API to get event > on next Read, but the last Read registration will remain as stale entry,the > object gets deleted when server sends Finish, then I get crash on > processing of completion queue > > } > > bool RequestSent() override { > return requestSent; > } > > void SetRequestSent() override { > requestSent = true; > } > }; > > // struct for keeping state and data information > class AsyncClientCallCloser: public ResponseHandler { > private: > AsyncClientCall *m_call; > public: > AsyncClientCallCloser(AsyncClientCall *call): m_call(call) {} > > // Storage for the status of the RPC upon completion. > Status status; > > bool HandleResponse() override { > if (status.ok()) { > std::cout << "Server Response Completed" << std::endl; > } > else { > std::cout << "RPC failed" << std::endl; > } > delete m_call; > delete this; > } > }; > > > // Out of the passed in Channel comes the stub, stored here, our view > of the > // server's exposed services. > std::unique_ptr<Greeter::Stub> stub_; > > // The producer-consumer queue we use to communicate asynchronously > with the > // gRPC runtime. > CompletionQueue cq_; > }; > > int main(int argc, char** argv) { > > > // Instantiate the client. It requires a channel, out of which the > actual RPCs > // are created. This channel models a connection to an endpoint (in > this case, > // localhost at port 50051). We indicate that the channel isn't > authenticated > // (use of InsecureChannelCredentials()). > GreeterClient greeter(grpc::CreateChannel( > "localhost:50051", grpc::InsecureChannelCredentials())); > > // Spawn reader thread that loops indefinitely > std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, > &greeter); > > std::string user("world"); > greeter.SayHello(user); // The actual RPC call! > > std::cout << "Press control-c to quit" << std::endl << std::endl; > thread_.join(); //blocks forever > > return 0; > } > > *Server Side Code:* > class ServerImpl final { > public: > ~ServerImpl() { > server_->Shutdown(); > // Always shutdown the completion queue after the server. > cq_->Shutdown(); > } > > // There is no shutdown handling in this code. > void Run() { > std::string server_address("0.0.0.0:50051"); > > ServerBuilder builder; > // Listen on the given address without any authentication > mechanism. > builder.AddListeningPort(server_address, > grpc::InsecureServerCredentials()); > // Register "service_" as the instance through which we'll > communicate with > // clients. In this case it corresponds to an *asynchronous* > service. > builder.RegisterService(&service_); > // Get hold of the completion queue used for the asynchronous > communication > // with the gRPC runtime. > cq_ = builder.AddCompletionQueue(); > // Finally assemble the server. > server_ = builder.BuildAndStart(); > std::cout << "Server listening on " << server_address << std::endl; > > // Proceed to the server's main loop. > HandleRpcs(); > } > > private: > // Class encompasing the state and logic needed to serve a request. > class CallData { > public: > // Take in the "service" instance (in this case representing an > asynchronous > // server) and the completion queue "cq" used for asynchronous > communication > // with the gRPC runtime. > CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) > : service_(service), cq_(cq), repliesSent_(0), > responder_(&ctx_), status_(CREATE) { > // Invoke the serving logic right away. > Proceed(); > } > > void Proceed() { > if (status_ == CREATE) { > // Make this instance progress to the PROCESS state. > status_ = PROCESS; > std::cout << "Creating Call data for new client > connections: " << this << std::endl; > // As part of the initial CREATE state, we *request* that > the system > // start processing SayHello requests. In this request, > "this" acts are > // the tag uniquely identifying the request (so that > different CallData > // instances can serve different requests concurrently), > in this case > // the memory address of this CallData instance. > service_->RequestSayHello(&ctx_, &request_, &responder_, > cq_, cq_, > (void*) this); > } else if (status_ == PROCESS) { > // Spawn a new CallData instance to serve new clients > while we process > // the one for this CallData. The instance will deallocate > itself as > // part of its FINISH state. > new CallData(service_, cq_); > > // The actual processing. > std::string prefix("Hello "); > reply_.set_message(prefix + request_.name() + > std::to_string(repliesSent_ + 1)); > std::cout << "Sending reponse: " << this << " : " << > reply_.message() << std::endl; > responder_.Write(reply_, this); > status_ = PROCESSING; > repliesSent_++; > > } else if (status_ == PROCESSING) { > if (repliesSent_ == MAX_REPLIES) { > // And we are done! Let the gRPC runtime know we've > finished, using the > // memory address of this instance as the uniquely > identifying tag for > // the event. > status_ = FINISH; > responder_.Finish(Status::OK, this); > } else { > // The actual processing. > std::string prefix("Hello "); > reply_.set_message(prefix + request_.name() + > std::to_string(repliesSent_ + 1)); > std::cout << "Sending reponse: " << this << " : " << > reply_.message() << std::endl; > responder_.Write(reply_, this); > status_ = PROCESSING; > repliesSent_++; > } > } else { > GPR_ASSERT(status_ == FINISH); > std::cout << "Completed RPC for: " << this << std::endl; > // Once in the FINISH state, deallocate ourselves > (CallData). > delete this; > } > } > > private: > // The means of communication with the gRPC runtime for an > asynchronous > // server. > Greeter::AsyncService* service_; > // The producer-consumer queue where for asynchronous server > notifications. > ServerCompletionQueue* cq_; > // Context for the rpc, allowing to tweak aspects of it such as > the use > // of compression, authentication, as well as to send metadata > back to the > // client. > ServerContext ctx_; > > // What we get from the client. > HelloRequest request_; > // What we send back to the client. > HelloReply reply_; > > uint32_t repliesSent_; > const uint32_t MAX_REPLIES = 5; > > // The means to get back to the client. > ServerAsyncWriter<HelloReply> responder_; > > // Let's implement a tiny state machine with the following states. > enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH }; > CallStatus status_; // The current serving state. > }; > > // This can be run in multiple threads if needed. > void HandleRpcs() { > // Spawn a new CallData instance to serve new clients. > new CallData(&service_, cq_.get()); > void* tag; // uniquely identifies a request. > bool ok; > while (true) { > // Block waiting to read the next event from the completion > queue. The > // event is uniquely identified by its tag, which in this case > is the > // memory address of a CallData instance. > // The return value of Next should always be checked. This > return value > // tells us whether there is any kind of event or cq_ is > shutting down. > GPR_ASSERT(cq_->Next(&tag, &ok)); > GPR_ASSERT(ok); > static_cast<CallData*>(tag)->Proceed(); > } > } > > std::unique_ptr<ServerCompletionQueue> cq_; > Greeter::AsyncService service_; > std::unique_ptr<Server> server_; > }; > > int main(int argc, char** argv) { > ServerImpl server; > server.Run(); > > return 0; > } > > *Output:* > *Server Side:* > kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$ > ./greeter_async_server > Server listening on 0.0.0.0:50051 > Creating Call data for new client connections: 0xb0b5e0 > Creating Call data for new client connections: 0xb07970 > Sending reponse: 0xb0b5e0 : Hello world1 > Sending reponse: 0xb0b5e0 : Hello world2 > Sending reponse: 0xb0b5e0 : Hello world3 > Sending reponse: 0xb0b5e0 : Hello world4 > Sending reponse: 0xb0b5e0 : Hello world5 > Completed RPC for: 0xb0b5e0 > > Client Side: > kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$ > ./greeter_async_client2 > Press control-c to quit > > Hello Request sent successfully > Greeter received: Hello world1 > Greeter received: Hello world2 > Greeter received: Hello world3 > Greeter received: Hello world4 > Greeter received: Hello world5 > Server Response Completed > pure virtual method called > terminate called without an active exception > *Aborted (core dumped)* > > > (gdb) bt > #0 0x00007ffff6b4bc37 in __GI_raise (sig=sig@entry=6) > at ../nptl/sysdeps/unix/sysv/linux/raise.c:56 > #1 0x00007ffff6b4f028 in __GI_abort () at abort.c:89 > #2 0x00007ffff7153535 in __gnu_cxx::__verbose_terminate_handler() () > from /usr/lib/x86_64-linux-gnu/libstdc++.so.6 > #3 0x00007ffff71516d6 in ?? () from > /usr/lib/x86_64-linux-gnu/libstdc++.so.6 > #4 0x00007ffff7151703 in std::terminate() () > from /usr/lib/x86_64-linux-gnu/libstdc++.so.6 > *#5 0x00007ffff71521bf in __cxa_pure_virtual ()* > from /usr/lib/x86_64-linux-gnu/libstdc++.so.6 > #6 0x00007ffff7bb6e83 in grpc::CompletionQueue::AsyncNextInternal(void**, > bool*, gpr_timespec) () from /usr/local/lib/libgrpc++.so.3 > #7 0x000000000041368f in grpc::CompletionQueue::Next > (this=0x7fffffffde28, > tag=0x7ffff63e7d40, ok=0x7ffff63e7d3f) > at /usr/local/include/grpc++/impl/codegen/completion_queue.h:153 > #8 0x0000000000413b83 in GreeterClient::AsyncCompleteRpc > (this=0x7fffffffde20) > at greeter_async_client2.cc:92 > #9 0x0000000000415b0b in std::_Mem_fn<void > (GreeterClient::*)()>::operator()<, void>(GreeterClient*) const > (this=0x6258f8, __object=0x7fffffffde20) > at /usr/include/c++/4.8/functional:601 > #10 0x0000000000415a5b in std::_Bind_simple<std::_Mem_fn<void > (GreeterClient::*)()> > (GreeterClient*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x6258f0) > at /usr/include/c++/4.8/functional:1732 > #11 0x0000000000415963 in std::_Bind_simple<std::_Mem_fn<void > (GreeterClient::*)()> (GreeterClient*)>::operator()() (this=0x6258f0) > at /usr/include/c++/4.8/functional:1720 > #12 0x00000000004158fc in > std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void > (GreeterClient::*)()> (GreeterClient*)> >::_M_run() (this=0x6258d8) > at /usr/include/c++/4.8/thread:115 > #13 0x00007ffff71a4a60 in ?? () from > /usr/lib/x86_64-linux-gnu/libstdc++.so.6 > #14 0x00007ffff63f1184 in start_thread (arg=0x7ffff63e8700) > at pthread_create.c:312 > #15 0x00007ffff6c12bed in clone () > at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111 > > > > > > > > > On Wednesday, 26 April 2017 19:39:19 UTC+5:30, William Thurston wrote: >> >> This is how I write similar APIs, with one caveat. I typically complete >> or close with CANCEL the connections randomly somewhere between 15-30 >> minutes after initial request to force my clients to handle reconnection. >> It also helps quite a bit with load balancing and deployments of new server >> code to know your clients are already coded to handle disconnections as a >> common occurrence. >> >> William Thurston >> >> On Apr 26, 2017, at 12:44 AM, Kuldeep Melligeri <[email protected]> >> wrote: >> >> Hi, >> >> I am implementing a service where client is suppose to subscribe for some >> information on server. I am planning to implement with async streaming >> APIs. >> On server when it gets client request for the first time, the server >> starting writing back to client as and when the data is available from >> different source. >> To keep the RPC alive, I am only calling "write" whenever the data is >> available. I am planning to call finish only when server shuts down. >> >> Is this is the general practice for subscription based service? Or is >> there any better way to do this job? >> >> Thanks >> Kuldeep >> >> -- >> You received this message because you are subscribed to the Google Groups >> "grpc.io" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To post to this group, send email to [email protected]. >> Visit this group at https://groups.google.com/group/grpc-io. >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/grpc-io/dc37f9f3-7b7b-4f3c-b423-e628d68c8074%40googlegroups.com >> >> <https://groups.google.com/d/msgid/grpc-io/dc37f9f3-7b7b-4f3c-b423-e628d68c8074%40googlegroups.com?utm_medium=email&utm_source=footer> >> . >> For more options, visit https://groups.google.com/d/optout. >> >> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/872b8f80-b71b-4eeb-b1eb-cc37f8b825da%40googlegroups.com. For more options, visit https://groups.google.com/d/optout.
