Thanks William for reply,
I need your help debugging the issue which I am facing with this
implementation.
I extremely apologies as I am going to write more details of what I am
facing so that you will get the complete context. This is a C++
implementation.
I have tweaked, greeter hello example to perform async streaming operation.
What I have done is to initiate a stream request from client and wait on
read/finish on the client side. On server side, when it gets request, it
will start sending the stream of replies, it will send 5 replies by calling
write operation and finally it will call "finish" operation.
On client side, I have registered the call data pointer as tag to perform
read operation and finish operation. As I expect to perform multiple reads,
each time the read event is triggered, I will print the response message
and again call Read to get read to receive next read event (The last Read
call cause a stale pointer in completion queue, I will explain in the next
line).
On server side, after it sends the 5th response, it will call Finish, on
client side, the finish tag gets called, where I will say RPC is completed
and delete the object, but the problem here is that last Read is registered
with stale pointer and this is causing core dump, with message call to pure
virtual function.
Here is my code:
*Protocol Buffer:*
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
*Client Side code:*
class GreeterClient {
public:
explicit GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}
// Assembles the client's payload and sends it to the server.
void SayHello(const std::string& user) {
HelloRequest request;
// Data we are sending to the server.
request.set_name(user);
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
AsyncClientCallCloser* callCloser = new AsyncClientCallCloser(call);
// stub_->AsyncSayHello() performs the RPC call, returning an
instance to
// store in "call". Because we are using the asynchronous API, we
need to
// hold on to the "call" instance in order to get updates on the
ongoing RPC.
call->response_reader = stub_->AsyncSayHello(&call->context,
request, &cq_, (void *)call);
// Request that, upon completion of the RPC, "reply" be updated
with the
// server's response; "status" with the indication of whether the
operation
// was successful. Tag the request with the memory address of the
call object.
call->response_reader->Read(&call->reply, (void*)call);
// Request that, upon completion of the RPC, "reply" be updated
with the
// server's response; "status" with the indication of whether the
operation
// was successful. Tag the request with the memory address of the
call object.
call->response_reader->Finish(&callCloser->status,
(void*)callCloser); /// When I get finish, I will delete callClosure and
also the call object.
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc() {
void* got_tag;
bool ok = false;
// Block until the next result is available in the completion queue
"cq".
while (cq_.Next(&got_tag, &ok)) {
// The tag in this example is the memory location of the call
object
ResponseHandler* responseHandler =
static_cast<ResponseHandler*>(got_tag);
// Verify that the request was completed successfully. Note
that "ok"
// corresponds solely to the request for updates introduced by
Finish().
GPR_ASSERT(ok);
if (responseHandler->RequestSent()) {
responseHandler->HandleResponse();
} else {
std::cout << "Hello Request sent successfully" << std::endl;
responseHandler->SetRequestSent();
}
}
}
private:
class ResponseHandler {
public:
virtual bool HandleResponse() = 0;
virtual bool RequestSent() {return true;} ;
virtual void SetRequestSent() {};
};
// struct for keeping state and data information
class AsyncClientCall: public ResponseHandler {
// Request sent event
bool requestSent;
public:
AsyncClientCall(): requestSent(false) {}
// Container for the data we expect from the server.
HelloReply reply;
// Context for the client. It could be used to convey extra
information to
// the server and/or tweak certain RPC behaviors.
ClientContext context;
// Storage for the status of the RPC upon completion.
Status status;
//std::unique_ptr<ClientAsyncResponseReader<HelloReply>>
response_reader;
std::unique_ptr<ClientAsyncReaderInterface<HelloReply>>
response_reader;
bool HandleResponse() override {
std::cout << "Greeter received: " << reply.message() <<
std::endl;
*response_reader->Read(&reply, (void*)this); *
// Here I read the response and call Read API to get event on
next Read, but the last Read registration will remain as stale entry,the
object gets deleted when server sends Finish, then I get crash on
processing of completion queue
}
bool RequestSent() override {
return requestSent;
}
void SetRequestSent() override {
requestSent = true;
}
};
// struct for keeping state and data information
class AsyncClientCallCloser: public ResponseHandler {
private:
AsyncClientCall *m_call;
public:
AsyncClientCallCloser(AsyncClientCall *call): m_call(call) {}
// Storage for the status of the RPC upon completion.
Status status;
bool HandleResponse() override {
if (status.ok()) {
std::cout << "Server Response Completed" << std::endl;
}
else {
std::cout << "RPC failed" << std::endl;
}
delete m_call;
delete this;
}
};
// Out of the passed in Channel comes the stub, stored here, our view
of the
// server's exposed services.
std::unique_ptr<Greeter::Stub> stub_;
// The producer-consumer queue we use to communicate asynchronously
with the
// gRPC runtime.
CompletionQueue cq_;
};
int main(int argc, char** argv) {
// Instantiate the client. It requires a channel, out of which the
actual RPCs
// are created. This channel models a connection to an endpoint (in
this case,
// localhost at port 50051). We indicate that the channel isn't
authenticated
// (use of InsecureChannelCredentials()).
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
// Spawn reader thread that loops indefinitely
std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc,
&greeter);
std::string user("world");
greeter.SayHello(user); // The actual RPC call!
std::cout << "Press control-c to quit" << std::endl << std::endl;
thread_.join(); //blocks forever
return 0;
}
*Server Side Code:*
class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:50051");
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address,
grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll
communicate with
// clients. In this case it corresponds to an *asynchronous*
service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous
communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRpcs();
}
private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an
asynchronous
// server) and the completion queue "cq" used for asynchronous
communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), repliesSent_(0),
responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
std::cout << "Creating Call data for new client
connections: " << this << std::endl;
// As part of the initial CREATE state, we *request* that
the system
// start processing SayHello requests. In this request,
"this" acts are
// the tag uniquely identifying the request (so that
different CallData
// instances can serve different requests concurrently), in
this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_,
cq_, cq_,
(void*) this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while
we process
// the one for this CallData. The instance will deallocate
itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() +
std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " <<
reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;
} else if (status_ == PROCESSING) {
if (repliesSent_ == MAX_REPLIES) {
// And we are done! Let the gRPC runtime know we've
finished, using the
// memory address of this instance as the uniquely
identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(Status::OK, this);
} else {
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() +
std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " <<
reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;
}
} else {
GPR_ASSERT(status_ == FINISH);
std::cout << "Completed RPC for: " << this << std::endl;
// Once in the FINISH state, deallocate ourselves
(CallData).
delete this;
}
}
private:
// The means of communication with the gRPC runtime for an
asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server
notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the
use
// of compression, authentication, as well as to send metadata back
to the
// client.
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
uint32_t repliesSent_;
const uint32_t MAX_REPLIES = 5;
// The means to get back to the client.
ServerAsyncWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
CallStatus status_; // The current serving state.
};
// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion
queue. The
// event is uniquely identified by its tag, which in this case
is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This
return value
// tells us whether there is any kind of event or cq_ is
shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};
int main(int argc, char** argv) {
ServerImpl server;
server.Run();
return 0;
}
*Output:*
*Server Side:*
kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$
./greeter_async_server
Server listening on 0.0.0.0:50051
Creating Call data for new client connections: 0xb0b5e0
Creating Call data for new client connections: 0xb07970
Sending reponse: 0xb0b5e0 : Hello world1
Sending reponse: 0xb0b5e0 : Hello world2
Sending reponse: 0xb0b5e0 : Hello world3
Sending reponse: 0xb0b5e0 : Hello world4
Sending reponse: 0xb0b5e0 : Hello world5
Completed RPC for: 0xb0b5e0
Client Side:
kuldeep@ubuntu:~/grpc/grpc/examples/cpp/helloworldstream$
./greeter_async_client2
Press control-c to quit
Hello Request sent successfully
Greeter received: Hello world1
Greeter received: Hello world2
Greeter received: Hello world3
Greeter received: Hello world4
Greeter received: Hello world5
Server Response Completed
pure virtual method called
terminate called without an active exception
*Aborted (core dumped)*
(gdb) bt
#0 0x00007ffff6b4bc37 in __GI_raise (sig=sig@entry=6)
at ../nptl/sysdeps/unix/sysv/linux/raise.c:56
#1 0x00007ffff6b4f028 in __GI_abort () at abort.c:89
#2 0x00007ffff7153535 in __gnu_cxx::__verbose_terminate_handler() ()
from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3 0x00007ffff71516d6 in ?? () from
/usr/lib/x86_64-linux-gnu/libstdc++.so.6
#4 0x00007ffff7151703 in std::terminate() ()
from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
*#5 0x00007ffff71521bf in __cxa_pure_virtual ()*
from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#6 0x00007ffff7bb6e83 in grpc::CompletionQueue::AsyncNextInternal(void**,
bool*, gpr_timespec) () from /usr/local/lib/libgrpc++.so.3
#7 0x000000000041368f in grpc::CompletionQueue::Next (this=0x7fffffffde28,
tag=0x7ffff63e7d40, ok=0x7ffff63e7d3f)
at /usr/local/include/grpc++/impl/codegen/completion_queue.h:153
#8 0x0000000000413b83 in GreeterClient::AsyncCompleteRpc
(this=0x7fffffffde20)
at greeter_async_client2.cc:92
#9 0x0000000000415b0b in std::_Mem_fn<void
(GreeterClient::*)()>::operator()<, void>(GreeterClient*) const
(this=0x6258f8, __object=0x7fffffffde20)
at /usr/include/c++/4.8/functional:601
#10 0x0000000000415a5b in std::_Bind_simple<std::_Mem_fn<void
(GreeterClient::*)()>
(GreeterClient*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x6258f0)
at /usr/include/c++/4.8/functional:1732
#11 0x0000000000415963 in std::_Bind_simple<std::_Mem_fn<void
(GreeterClient::*)()> (GreeterClient*)>::operator()() (this=0x6258f0)
at /usr/include/c++/4.8/functional:1720
#12 0x00000000004158fc in
std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void
(GreeterClient::*)()> (GreeterClient*)> >::_M_run() (this=0x6258d8)
at /usr/include/c++/4.8/thread:115
#13 0x00007ffff71a4a60 in ?? () from
/usr/lib/x86_64-linux-gnu/libstdc++.so.6
#14 0x00007ffff63f1184 in start_thread (arg=0x7ffff63e8700)
at pthread_create.c:312
#15 0x00007ffff6c12bed in clone ()
at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111
On Wednesday, 26 April 2017 19:39:19 UTC+5:30, William Thurston wrote:
>
> This is how I write similar APIs, with one caveat. I typically complete or
> close with CANCEL the connections randomly somewhere between 15-30 minutes
> after initial request to force my clients to handle reconnection. It also
> helps quite a bit with load balancing and deployments of new server code to
> know your clients are already coded to handle disconnections as a common
> occurrence.
>
> William Thurston
>
> On Apr 26, 2017, at 12:44 AM, Kuldeep Melligeri <[email protected]
> <javascript:>> wrote:
>
> Hi,
>
> I am implementing a service where client is suppose to subscribe for some
> information on server. I am planning to implement with async streaming
> APIs.
> On server when it gets client request for the first time, the server
> starting writing back to client as and when the data is available from
> different source.
> To keep the RPC alive, I am only calling "write" whenever the data is
> available. I am planning to call finish only when server shuts down.
>
> Is this is the general practice for subscription based service? Or is
> there any better way to do this job?
>
> Thanks
> Kuldeep
>
> --
> You received this message because you are subscribed to the Google Groups "
> grpc.io" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected] <javascript:>.
> To post to this group, send email to [email protected] <javascript:>
> .
> Visit this group at https://groups.google.com/group/grpc-io.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/grpc-io/dc37f9f3-7b7b-4f3c-b423-e628d68c8074%40googlegroups.com
>
> <https://groups.google.com/d/msgid/grpc-io/dc37f9f3-7b7b-4f3c-b423-e628d68c8074%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/7361edaf-531f-4001-8820-eb097333d247%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.