Hi there,
"ok" is documented at
https://github.com/grpc/grpc/blob/master/include/grpc%2B%2B/impl/codegen/completion_queue.h#L124
 .

Finish will always give an ok of 1 since it's always ok to be done with
your RPC and check its status.

When starting an RPC, ok will give 0 only if gRPC is sure that that RPC
will never actually work (see the comments for more detail): the ok of 1
indicates that gRPC is going to send this RPC initiation out on the wire.
It would be 0 if the channel was permanently broken or the fail-fast option
is set; the call could still end up failing even if the ok were 1 at the
initiation time.

Hope that helps!

- Vijay


On Wed, Feb 7, 2018 at 1:16 PM <[email protected]> wrote:

>
> I started learning grpc very recently, and followed the docs/instructions
> and discussions online to build a toy program of an asynchronous streaming
> RPC client (.proto and client.cc are attached below). *My grpc version is
> 1.8.4.*
>
> My main idea for this client is to create a "listener" in a separate
> thread receiving the streaming response from server, so once a request is
> sent, my later-extended program will proceed in the main thread without
> being blocked. If server is not accessible (either when the client starts
> or the connection drops in the middle), an alert email will be sent. I
> don't want this is over alerting but just that each time the RPC connection
> drops. Then client will try to reconnect every 5 seconds.
>
> Now, I have a "state machine" in HandleResponse() to handle the events.
> As a basic test, even without a server, I should be able to start my
> client, got RPC failure, send email and try to reconnect ...
> The following output (*Expected*) confirms the expectation. And if my
> understanding is correct, the first cq returned ok being 0 is because the
> RPC failure event in the cq; while the following cq returned ok being 1 is
> to acknowledge my Finish() request fired in "case Finish:" block in
> HandleResponse(). Correct?
>
> However, sometime the output (*Not Expected*) has additional event at the
> beginning (lines highlighted in light red). Cq returned ok is 1, which
> makes the state machine consider it's "connected" and turn on the email
> alert. When this happens, multiple emails will be sent which is not what I
> expect (it should only send the second email when it connects then drops).
>
> This turns out to happen randomly. From my observation, it's less likely
> (1 out of 10 tries) hitting the "not expected" case when client is pinging
> localhost while the likelihood is higher when pinging a remote host.
>
> My questions:
> 1) What's the first event in the cq? From the tag address, it's the same
> AsyncClientCall object but shouldn't be only two events: 1) ok=0 (rpc
> failure) and 2) ok=1 as ack of Finish()?
> 2) Is the cq the middle man between server and client that all
> client-server communication messages are going to be registered in cq? Is
> there a race condition here between server's response and client's actions:
> send request, Read, Finish, etc?
>
> Thanks in advance!
>
>
> *Expected*
> $ ./hw_cli
> ctor of GreeterClient
> ----@@ ASYNCComplete on separate thread!!
> Created new AsyncClientCall at: 0x1774440
> ## SAYING hello!!
> Press control-c to quit
>
> dtor of GreeterClient
> Tag received: 0x1774440, Completion queue/Next() returned: 0
> HandleEachResponse_CREATE_BAD
> Tag received: 0x1774440, Completion queue/Next() returned: 1
> HandleEachResponse_FINISH_BAD
> RPC failed
> *********dtor AsyncClientCall
> Reconnecting ...
>
> *Not Expected*
> ./hw_cli
> ctor of GreeterClient
> ----@@ ASYNCComplete on separate thread!!
> Created new AsyncClientCall at: 0x2419440
> ## SAYING hello!!
> Press control-c to quit
>
> dtor of GreeterClient
> Tag received: 0x2419440, Completion queue/Next() returned: 1
> HandleEachResponse_CREATE_GOOD
> Tag received: 0x2419440, Completion queue/Next() returned: 0
> HandleEachResponse_PROCESS_BAD
> Tag received: 0x2419440, Completion queue/Next() returned: 1
> HandleEachResponse_FINISH_BAD
> RPC failed
> *********dtor AsyncClientCall
> Reconnecting ...
>
>
> *helloworld.proto*
>
> syntax = "proto3";
> package helloworld;
>
> // The greeting service definition.
> service Greeter {
>   // Sends a greeting
>   rpc SayHello (HelloRequest) returns (stream HelloReply) {}
> }
>
> // The request message containing the user's name.
> message HelloRequest {
>   string name = 1;
> }
>
> // The response message containing the greetings
> message HelloReply {
>   string message = 1;
> }
>
> *client.cc*
> #include <iostream>
> #include <memory>
> #include <string>
> #include <thread>
>
> #include <grpc++/grpc++.h>
> #include "helloworld.grpc.pb.h"
>
> using grpc::Channel;
> using grpc::ChannelArguments;
> using grpc::ClientContext;
> using grpc::Status;
> using grpc::ClientAsyncReaderInterface;
> using grpc::CompletionQueue;
>
> using helloworld::HelloRequest;
> using helloworld::HelloReply;
> using helloworld::Greeter;
>
> class GreeterClient {
> public:
>         GreeterClient() : email_alert(true)
> {
> std::cout << "ctor of GreeterClient" << std::endl;
>
> // Customize backoff arguments.
> int backoff_ms = 5000;
> grpc::ChannelArguments ch_args;
> ch_args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, backoff_ms);
> ch_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS,     backoff_ms);
> ch_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS,     backoff_ms);
>
> // Create a custom channel and attach it to the stub.
> stub_ = Greeter::NewStub(grpc::CreateCustomChannel("localhost:50051",
> grpc::InsecureChannelCredentials(), ch_args));
> // stub_ = Greeter::NewStub(grpc::CreateCustomChannel("REMOTE_HOST:50051",
> grpc::InsecureChannelCredentials(), ch_args));
>
> // Spawn a thread listening to stream asynchronously.
>             thread_listener_ =
> std::thread(&GreeterClient::AsyncCompleteRpc, this);
> }
>
> ~GreeterClient(void)
> {
> std::cout << "dtor of GreeterClient" << std::endl;
> if (thread_listener_.joinable()) thread_listener_.join();
> std::cout << "thread joined" << std::endl;
> }
>
>         // Assembles the client's payload and sends it to the server.
>         void SayHello(void)
> {
>             HelloRequest request;
>             // Data we are sending to the server.
>             request.set_name("World");
>
>             // Call object to store rpc data
>             AsyncClientCall* call = new AsyncClientCall;
>     std::cout << "Created new AsyncClientCall at: " << call << std::endl;
>
>             call->response_reader = stub_->AsyncSayHello(&call->context,
> request, &cq_, (void *)call);
>     std::cout << "## SAYING hello!!" << std::endl;
>         }
>
>         // Loop while listening for completed responses.
>         // Prints out the response from the server.
>         void AsyncCompleteRpc()
> {
>             void* got_tag;
>             bool ok = false;
>     std::cout << "----@@ ASYNCComplete on separate thread!!" << std::endl;
>
>             // Block until the next result is available in the completion
> queue "cq".
>             while (cq_.Next(&got_tag, &ok))
>     {
>                 // The tag in this example is the memory location of the
> call object
>                 AsyncClientCall* ac =
> static_cast<AsyncClientCall*>(got_tag);
>                 std::cout << "Tag received: " << ac << ", Completion
> queue/Next() returned: " << ok << std::endl;
>
>                 // Verify that the request was completed successfully.
> Note that "ok" corresponds solely to the request for updates introduced by
> Finish().
>                 if (! ac->HandleResponse(ok, email_alert))
> {
>     if (email_alert)
>     {
> std::string cmd { "echo | mail -s \"Test HelloWorld service is down.\"
> [email protected]" };
> system(cmd.c_str());
> email_alert = false;
>     }
>                     delete ac;
>     std::cout << "Reconnecting ..." << std::endl;
>          SayHello();
> }
>             }
>         }
>
> private:
> bool email_alert;
> std::thread thread_listener_;
>
>     // struct for keeping state and data information
>         struct AsyncClientCall
> {
>             enum CallStatus {CREATE, PROCESS, FINISH};
>             CallStatus callStatus_;
>
>             Status status;
>             HelloReply reply;
>             ClientContext context;
>             std::unique_ptr<ClientAsyncReaderInterface<HelloReply>>
> response_reader;
>
>             AsyncClientCall(): callStatus_(CREATE) {}
>
>             ~AsyncClientCall()
>     {
> std::cout << "*********dtor AsyncClientCall" << std::endl;
>     }
>
>             bool HandleResponse(bool responseStatus, bool& email_alert)
>          {
>                 switch (callStatus_)
> {
>                 case CREATE:
>                     if (responseStatus) {
> std::cout << "HandleEachResponse_CREATE_GOOD" << std::endl;
>                         response_reader->Read(&reply, (void*)this);
>                         callStatus_ = PROCESS;
> email_alert = true;
>                     } else {
> std::cout << "HandleEachResponse_CREATE_BAD" << std::endl;
>                         response_reader->Finish(&status, (void*)this);
>                         callStatus_ = FINISH;
>                     }
>                     break;
>                 case PROCESS:
>                     if (responseStatus) {
> std::cout << "HandleEachResponse_PROCESS_GOOD" << std::endl;
>                         std::cout << "PROCESS_GOOD: Greeter received: " <<
> this << " : " << reply.message() << std::endl;
>                         response_reader->Read(&reply, (void*)this);
>                     } else {
>         std::cout << "HandleEachResponse_PROCESS_BAD" << std::endl;
>                         response_reader->Finish(&status, (void*)this);
>                         callStatus_ = FINISH;
>                     }
>                     break;
>                 case FINISH:
>                     if (status.ok()) {
>   std::cout << "HandleEachResponse_FINISH_GOOD" << std::endl;
>                         std::cout << "Server Response Completed: " << this
> << " CallData: " << this << std::endl;
>                     }
>                     else {
> std::cout << "HandleEachResponse_FINISH_BAD" << std::endl;
>                         std::cout << "RPC failed" << std::endl;
> return false;
>                     }
>                     break;
> default:
>     break;
>                 }
>     return true;
>         }
>     };
>
>     std::unique_ptr<Greeter::Stub> stub_;
>
>     CompletionQueue cq_;
> };
>
> int main(int argc, char** argv) {
>     GreeterClient greeter;
> greeter.SayHello();
>
>     std::cout << "Press control-c to quit" << std::endl << std::endl;
>
>     return 0;
> }
>
> --
> You received this message because you are subscribed to the Google Groups "
> grpc.io" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/grpc-io.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/grpc-io/663935fd-46a5-4445-aab0-d11164aec264%40googlegroups.com
> <https://groups.google.com/d/msgid/grpc-io/663935fd-46a5-4445-aab0-d11164aec264%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/CADEy0h1J3D4NQGaiM0LbDwCm4AA2QqWxgwKkZuaXsLdPeu%3DwxA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to