Hi there, "ok" is documented at https://github.com/grpc/grpc/blob/master/include/grpc%2B%2B/impl/codegen/completion_queue.h#L124 .
Finish will always give an ok of 1 since it's always ok to be done with your RPC and check its status. When starting an RPC, ok will give 0 only if gRPC is sure that that RPC will never actually work (see the comments for more detail): the ok of 1 indicates that gRPC is going to send this RPC initiation out on the wire. It would be 0 if the channel was permanently broken or the fail-fast option is set; the call could still end up failing even if the ok were 1 at the initiation time. Hope that helps! - Vijay On Wed, Feb 7, 2018 at 1:16 PM <[email protected]> wrote: > > I started learning grpc very recently, and followed the docs/instructions > and discussions online to build a toy program of an asynchronous streaming > RPC client (.proto and client.cc are attached below). *My grpc version is > 1.8.4.* > > My main idea for this client is to create a "listener" in a separate > thread receiving the streaming response from server, so once a request is > sent, my later-extended program will proceed in the main thread without > being blocked. If server is not accessible (either when the client starts > or the connection drops in the middle), an alert email will be sent. I > don't want this is over alerting but just that each time the RPC connection > drops. Then client will try to reconnect every 5 seconds. > > Now, I have a "state machine" in HandleResponse() to handle the events. > As a basic test, even without a server, I should be able to start my > client, got RPC failure, send email and try to reconnect ... > The following output (*Expected*) confirms the expectation. And if my > understanding is correct, the first cq returned ok being 0 is because the > RPC failure event in the cq; while the following cq returned ok being 1 is > to acknowledge my Finish() request fired in "case Finish:" block in > HandleResponse(). Correct? > > However, sometime the output (*Not Expected*) has additional event at the > beginning (lines highlighted in light red). Cq returned ok is 1, which > makes the state machine consider it's "connected" and turn on the email > alert. When this happens, multiple emails will be sent which is not what I > expect (it should only send the second email when it connects then drops). > > This turns out to happen randomly. From my observation, it's less likely > (1 out of 10 tries) hitting the "not expected" case when client is pinging > localhost while the likelihood is higher when pinging a remote host. > > My questions: > 1) What's the first event in the cq? From the tag address, it's the same > AsyncClientCall object but shouldn't be only two events: 1) ok=0 (rpc > failure) and 2) ok=1 as ack of Finish()? > 2) Is the cq the middle man between server and client that all > client-server communication messages are going to be registered in cq? Is > there a race condition here between server's response and client's actions: > send request, Read, Finish, etc? > > Thanks in advance! > > > *Expected* > $ ./hw_cli > ctor of GreeterClient > ----@@ ASYNCComplete on separate thread!! > Created new AsyncClientCall at: 0x1774440 > ## SAYING hello!! > Press control-c to quit > > dtor of GreeterClient > Tag received: 0x1774440, Completion queue/Next() returned: 0 > HandleEachResponse_CREATE_BAD > Tag received: 0x1774440, Completion queue/Next() returned: 1 > HandleEachResponse_FINISH_BAD > RPC failed > *********dtor AsyncClientCall > Reconnecting ... > > *Not Expected* > ./hw_cli > ctor of GreeterClient > ----@@ ASYNCComplete on separate thread!! > Created new AsyncClientCall at: 0x2419440 > ## SAYING hello!! > Press control-c to quit > > dtor of GreeterClient > Tag received: 0x2419440, Completion queue/Next() returned: 1 > HandleEachResponse_CREATE_GOOD > Tag received: 0x2419440, Completion queue/Next() returned: 0 > HandleEachResponse_PROCESS_BAD > Tag received: 0x2419440, Completion queue/Next() returned: 1 > HandleEachResponse_FINISH_BAD > RPC failed > *********dtor AsyncClientCall > Reconnecting ... > > > *helloworld.proto* > > syntax = "proto3"; > package helloworld; > > // The greeting service definition. > service Greeter { > // Sends a greeting > rpc SayHello (HelloRequest) returns (stream HelloReply) {} > } > > // The request message containing the user's name. > message HelloRequest { > string name = 1; > } > > // The response message containing the greetings > message HelloReply { > string message = 1; > } > > *client.cc* > #include <iostream> > #include <memory> > #include <string> > #include <thread> > > #include <grpc++/grpc++.h> > #include "helloworld.grpc.pb.h" > > using grpc::Channel; > using grpc::ChannelArguments; > using grpc::ClientContext; > using grpc::Status; > using grpc::ClientAsyncReaderInterface; > using grpc::CompletionQueue; > > using helloworld::HelloRequest; > using helloworld::HelloReply; > using helloworld::Greeter; > > class GreeterClient { > public: > GreeterClient() : email_alert(true) > { > std::cout << "ctor of GreeterClient" << std::endl; > > // Customize backoff arguments. > int backoff_ms = 5000; > grpc::ChannelArguments ch_args; > ch_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, backoff_ms); > ch_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, backoff_ms); > ch_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, backoff_ms); > > // Create a custom channel and attach it to the stub. > stub_ = Greeter::NewStub(grpc::CreateCustomChannel("localhost:50051", > grpc::InsecureChannelCredentials(), ch_args)); > // stub_ = Greeter::NewStub(grpc::CreateCustomChannel("REMOTE_HOST:50051", > grpc::InsecureChannelCredentials(), ch_args)); > > // Spawn a thread listening to stream asynchronously. > thread_listener_ = > std::thread(&GreeterClient::AsyncCompleteRpc, this); > } > > ~GreeterClient(void) > { > std::cout << "dtor of GreeterClient" << std::endl; > if (thread_listener_.joinable()) thread_listener_.join(); > std::cout << "thread joined" << std::endl; > } > > // Assembles the client's payload and sends it to the server. > void SayHello(void) > { > HelloRequest request; > // Data we are sending to the server. > request.set_name("World"); > > // Call object to store rpc data > AsyncClientCall* call = new AsyncClientCall; > std::cout << "Created new AsyncClientCall at: " << call << std::endl; > > call->response_reader = stub_->AsyncSayHello(&call->context, > request, &cq_, (void *)call); > std::cout << "## SAYING hello!!" << std::endl; > } > > // Loop while listening for completed responses. > // Prints out the response from the server. > void AsyncCompleteRpc() > { > void* got_tag; > bool ok = false; > std::cout << "----@@ ASYNCComplete on separate thread!!" << std::endl; > > // Block until the next result is available in the completion > queue "cq". > while (cq_.Next(&got_tag, &ok)) > { > // The tag in this example is the memory location of the > call object > AsyncClientCall* ac = > static_cast<AsyncClientCall*>(got_tag); > std::cout << "Tag received: " << ac << ", Completion > queue/Next() returned: " << ok << std::endl; > > // Verify that the request was completed successfully. > Note that "ok" corresponds solely to the request for updates introduced by > Finish(). > if (! ac->HandleResponse(ok, email_alert)) > { > if (email_alert) > { > std::string cmd { "echo | mail -s \"Test HelloWorld service is down.\" > [email protected]" }; > system(cmd.c_str()); > email_alert = false; > } > delete ac; > std::cout << "Reconnecting ..." << std::endl; > SayHello(); > } > } > } > > private: > bool email_alert; > std::thread thread_listener_; > > // struct for keeping state and data information > struct AsyncClientCall > { > enum CallStatus {CREATE, PROCESS, FINISH}; > CallStatus callStatus_; > > Status status; > HelloReply reply; > ClientContext context; > std::unique_ptr<ClientAsyncReaderInterface<HelloReply>> > response_reader; > > AsyncClientCall(): callStatus_(CREATE) {} > > ~AsyncClientCall() > { > std::cout << "*********dtor AsyncClientCall" << std::endl; > } > > bool HandleResponse(bool responseStatus, bool& email_alert) > { > switch (callStatus_) > { > case CREATE: > if (responseStatus) { > std::cout << "HandleEachResponse_CREATE_GOOD" << std::endl; > response_reader->Read(&reply, (void*)this); > callStatus_ = PROCESS; > email_alert = true; > } else { > std::cout << "HandleEachResponse_CREATE_BAD" << std::endl; > response_reader->Finish(&status, (void*)this); > callStatus_ = FINISH; > } > break; > case PROCESS: > if (responseStatus) { > std::cout << "HandleEachResponse_PROCESS_GOOD" << std::endl; > std::cout << "PROCESS_GOOD: Greeter received: " << > this << " : " << reply.message() << std::endl; > response_reader->Read(&reply, (void*)this); > } else { > std::cout << "HandleEachResponse_PROCESS_BAD" << std::endl; > response_reader->Finish(&status, (void*)this); > callStatus_ = FINISH; > } > break; > case FINISH: > if (status.ok()) { > std::cout << "HandleEachResponse_FINISH_GOOD" << std::endl; > std::cout << "Server Response Completed: " << this > << " CallData: " << this << std::endl; > } > else { > std::cout << "HandleEachResponse_FINISH_BAD" << std::endl; > std::cout << "RPC failed" << std::endl; > return false; > } > break; > default: > break; > } > return true; > } > }; > > std::unique_ptr<Greeter::Stub> stub_; > > CompletionQueue cq_; > }; > > int main(int argc, char** argv) { > GreeterClient greeter; > greeter.SayHello(); > > std::cout << "Press control-c to quit" << std::endl << std::endl; > > return 0; > } > > -- > You received this message because you are subscribed to the Google Groups " > grpc.io" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/grpc-io. > To view this discussion on the web visit > https://groups.google.com/d/msgid/grpc-io/663935fd-46a5-4445-aab0-d11164aec264%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/663935fd-46a5-4445-aab0-d11164aec264%40googlegroups.com?utm_medium=email&utm_source=footer> > . > For more options, visit https://groups.google.com/d/optout. > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/grpc-io. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CADEy0h1J3D4NQGaiM0LbDwCm4AA2QqWxgwKkZuaXsLdPeu%3DwxA%40mail.gmail.com. For more options, visit https://groups.google.com/d/optout.
smime.p7s
Description: S/MIME Cryptographic Signature
