I started learning grpc very recently, and followed the docs/instructions
and discussions online to build a toy program of an asynchronous streaming
RPC client (.proto and client.cc are attached below). *My grpc version is
1.8.4.*
My main idea for this client is to create a "listener" in a separate thread
receiving the streaming response from server, so once a request is sent, my
later-extended program will proceed in the main thread without being
blocked. If server is not accessible (either when the client starts or the
connection drops in the middle), an alert email will be sent. I don't want
this is over alerting but just that each time the RPC connection drops.
Then client will try to reconnect every 5 seconds.
Now, I have a "state machine" in HandleResponse() to handle the events. As
a basic test, even without a server, I should be able to start my client,
got RPC failure, send email and try to reconnect ...
The following output (*Expected*) confirms the expectation. And if my
understanding is correct, the first cq returned ok being 0 is because the
RPC failure event in the cq; while the following cq returned ok being 1 is
to acknowledge my Finish() request fired in "case Finish:" block in
HandleResponse(). Correct?
However, sometime the output (*Not Expected*) has additional event at the
beginning (lines highlighted in light red). Cq returned ok is 1, which
makes the state machine consider it's "connected" and turn on the email
alert. When this happens, multiple emails will be sent which is not what I
expect (it should only send the second email when it connects then drops).
This turns out to happen randomly. From my observation, it's less likely (1
out of 10 tries) hitting the "not expected" case when client is pinging
localhost while the likelihood is higher when pinging a remote host.
My questions:
1) What's the first event in the cq? From the tag address, it's the same
AsyncClientCall object but shouldn't be only two events: 1) ok=0 (rpc
failure) and 2) ok=1 as ack of Finish()?
2) Is the cq the middle man between server and client that all
client-server communication messages are going to be registered in cq? Is
there a race condition here between server's response and client's actions:
send request, Read, Finish, etc?
Thanks in advance!
*Expected*
$ ./hw_cli
ctor of GreeterClient
----@@ ASYNCComplete on separate thread!!
Created new AsyncClientCall at: 0x1774440
## SAYING hello!!
Press control-c to quit
dtor of GreeterClient
Tag received: 0x1774440, Completion queue/Next() returned: 0
HandleEachResponse_CREATE_BAD
Tag received: 0x1774440, Completion queue/Next() returned: 1
HandleEachResponse_FINISH_BAD
RPC failed
*********dtor AsyncClientCall
Reconnecting ...
*Not Expected*
./hw_cli
ctor of GreeterClient
----@@ ASYNCComplete on separate thread!!
Created new AsyncClientCall at: 0x2419440
## SAYING hello!!
Press control-c to quit
dtor of GreeterClient
Tag received: 0x2419440, Completion queue/Next() returned: 1
HandleEachResponse_CREATE_GOOD
Tag received: 0x2419440, Completion queue/Next() returned: 0
HandleEachResponse_PROCESS_BAD
Tag received: 0x2419440, Completion queue/Next() returned: 1
HandleEachResponse_FINISH_BAD
RPC failed
*********dtor AsyncClientCall
Reconnecting ...
*helloworld.proto*
syntax = "proto3";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
*client.cc*
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <grpc++/grpc++.h>
#include "helloworld.grpc.pb.h"
using grpc::Channel;
using grpc::ChannelArguments;
using grpc::ClientContext;
using grpc::Status;
using grpc::ClientAsyncReaderInterface;
using grpc::CompletionQueue;
using helloworld::HelloRequest;
using helloworld::HelloReply;
using helloworld::Greeter;
class GreeterClient {
public:
GreeterClient() : email_alert(true)
{
std::cout << "ctor of GreeterClient" << std::endl;
// Customize backoff arguments.
int backoff_ms = 5000;
grpc::ChannelArguments ch_args;
ch_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, backoff_ms);
ch_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, backoff_ms);
ch_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, backoff_ms);
// Create a custom channel and attach it to the stub.
stub_ = Greeter::NewStub(grpc::CreateCustomChannel("localhost:50051",
grpc::InsecureChannelCredentials(), ch_args));
// stub_ = Greeter::NewStub(grpc::CreateCustomChannel("REMOTE_HOST:50051",
grpc::InsecureChannelCredentials(), ch_args));
// Spawn a thread listening to stream asynchronously.
thread_listener_ =
std::thread(&GreeterClient::AsyncCompleteRpc, this);
}
~GreeterClient(void)
{
std::cout << "dtor of GreeterClient" << std::endl;
if (thread_listener_.joinable()) thread_listener_.join();
std::cout << "thread joined" << std::endl;
}
// Assembles the client's payload and sends it to the server.
void SayHello(void)
{
HelloRequest request;
// Data we are sending to the server.
request.set_name("World");
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
std::cout << "Created new AsyncClientCall at: " << call << std::endl;
call->response_reader = stub_->AsyncSayHello(&call->context,
request, &cq_, (void *)call);
std::cout << "## SAYING hello!!" << std::endl;
}
// Loop while listening for completed responses.
// Prints out the response from the server.
void AsyncCompleteRpc()
{
void* got_tag;
bool ok = false;
std::cout << "----@@ ASYNCComplete on separate thread!!" << std::endl;
// Block until the next result is available in the completion
queue "cq".
while (cq_.Next(&got_tag, &ok))
{
// The tag in this example is the memory location of the
call object
AsyncClientCall* ac =
static_cast<AsyncClientCall*>(got_tag);
std::cout << "Tag received: " << ac << ", Completion
queue/Next() returned: " << ok << std::endl;
// Verify that the request was completed successfully. Note
that "ok" corresponds solely to the request for updates introduced by
Finish().
if (! ac->HandleResponse(ok, email_alert))
{
if (email_alert)
{
std::string cmd { "echo | mail -s \"Test HelloWorld service is down.\"
[email protected]" };
system(cmd.c_str());
email_alert = false;
}
delete ac;
std::cout << "Reconnecting ..." << std::endl;
SayHello();
}
}
}
private:
bool email_alert;
std::thread thread_listener_;
// struct for keeping state and data information
struct AsyncClientCall
{
enum CallStatus {CREATE, PROCESS, FINISH};
CallStatus callStatus_;
Status status;
HelloReply reply;
ClientContext context;
std::unique_ptr<ClientAsyncReaderInterface<HelloReply>>
response_reader;
AsyncClientCall(): callStatus_(CREATE) {}
~AsyncClientCall()
{
std::cout << "*********dtor AsyncClientCall" << std::endl;
}
bool HandleResponse(bool responseStatus, bool& email_alert)
{
switch (callStatus_)
{
case CREATE:
if (responseStatus) {
std::cout << "HandleEachResponse_CREATE_GOOD" << std::endl;
response_reader->Read(&reply, (void*)this);
callStatus_ = PROCESS;
email_alert = true;
} else {
std::cout << "HandleEachResponse_CREATE_BAD" << std::endl;
response_reader->Finish(&status, (void*)this);
callStatus_ = FINISH;
}
break;
case PROCESS:
if (responseStatus) {
std::cout << "HandleEachResponse_PROCESS_GOOD" << std::endl;
std::cout << "PROCESS_GOOD: Greeter received: " <<
this << " : " << reply.message() << std::endl;
response_reader->Read(&reply, (void*)this);
} else {
std::cout << "HandleEachResponse_PROCESS_BAD" << std::endl;
response_reader->Finish(&status, (void*)this);
callStatus_ = FINISH;
}
break;
case FINISH:
if (status.ok()) {
std::cout << "HandleEachResponse_FINISH_GOOD" << std::endl;
std::cout << "Server Response Completed: " << this
<< " CallData: " << this << std::endl;
}
else {
std::cout << "HandleEachResponse_FINISH_BAD" << std::endl;
std::cout << "RPC failed" << std::endl;
return false;
}
break;
default:
break;
}
return true;
}
};
std::unique_ptr<Greeter::Stub> stub_;
CompletionQueue cq_;
};
int main(int argc, char** argv) {
GreeterClient greeter;
greeter.SayHello();
std::cout << "Press control-c to quit" << std::endl << std::endl;
return 0;
}
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/663935fd-46a5-4445-aab0-d11164aec264%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.