I originally posted my question at Stack Overflow here 
<https://stackoverflow.com/questions/74036172/how-to-unblock-drain-a-grpccompletionqueue-waiting-on-a-state-change>
 
and here 
<https://stackoverflow.com/questions/74044672/why-does-shutting-down-this-grpccompletionqueue-cause-an-assertion>
.
I later discovered this grpc-specific community, and I thought I might have 
better luck getting help here.

Very quickly, the premise is that I'm working with inherited code that I 
suspect was never tested/exercised. The intent of the code is notify 
listeners when a Channel's state changes. I've created a simpler MVCE (that 
just prints to stdout instead of notifying listeners) to demonstrate my 
question/problem.

The tl;dr of my question is: I have a thread that is waiting on a 
CompletionQueue::Next that is waiting on a Channel::NotifyOnStateChange, 
and I am unclear how to cleanly break the CompletionQueue::Next block.

The first version of my code uses Channel::NotifyOnStateChange() with an 
infinte deadline.
In a separate thread, I call CompletionQueue::Shutdown(): my understanding 
is that this is what is necessary to unblock a CompletionQueue::Next() and 
have it return false.
This does not happen, and I have a weak understanding that this is perhaps 
because the Channel::NotifyOnStateChange that pending in the 
CompletionQueue has an infinite deadline, and therefore never "completes" 
its life-cycle in the CompletionQueue if there is no channel state change, 
which is a realistic possibility.

The second version of my code uses Channel::NotifyOnStateChange() with a 
non-infinite deadline.
With this version of code, I do observe the CompletionQueue::Next() 
periodically unblocking.
So in my other thread, I call CompletionQueue::Shutdown(), expecting that 
is what is correct and necessary to unblock the CompletionQueue::Next().
But this results in my executable exiting non-gracefully; the following is 
output to stdout:
E1012 15:29:07.677225824      54 channel_connectivity.cc:234] assertion 
failed: grpc_cq_begin_op(cq, tag)
Aborted (core dumped)
I don't understand what the error is here.

I'd be grateful for help from the community.
My question is ultimately: *how do I cleanly exit a loop waiting on a 
CompletionQueue::Next() waiting on a Channel::NotifyOnStateChange() ?*

I have a preference for the NotifyOnStateChange to have an infinite 
deadline because the original code that I'm working with does so, and I'd 
like to make minimal changes, on the assumption that the original author 
made informed and deliberate choices.

My code is below:
================



























































*// main.cpp - Channel::NotifyOnStateChange with infinite timeout#include 
<iostream>#include <memory>#include <thread>#include 
<grpcpp/grpcpp.h>#include <unistd.h>using namespace std;using namespace 
grpc;void threadFunc(shared_ptr<Channel> ch, CompletionQueue* cq) {  void* 
tag = NULL;  bool ok = false;  int i = 1;  grpc_connectivity_state state = 
ch->GetState(false);  cout << "state " << i++ << " = " << (int)state << 
endl;  ch->NotifyOnStateChange(state,                          
gpr_inf_future(GPR_CLOCK_MONOTONIC),                          cq,          
                (void*)1);  while (cq->Next(&tag, &ok)) {    state = 
ch->GetState(false);    cout << "state " << i++ << " = " << (int)state << 
endl;    ch->NotifyOnStateChange(state,                            
gpr_inf_future(GPR_CLOCK_MONOTONIC),                            cq,        
                    (void*)1);  }  cout << "thread end" << endl;}int 
main(int argc, char* argv[]) {  ChannelArguments channel_args;  
CompletionQueue cq;  
channel_args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);  
channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);  
channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000);  
channel_args.SetInt(GRPC_ARG_HTTP2_BDP_PROBE, 0);  
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);  
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 30000);  
channel_args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS,  
                    60000);  {    shared_ptr<Channel> 
ch(CreateCustomChannel("my_grpc_server:50051",                              
                 InsecureChannelCredentials(),                              
                 channel_args));    std::thread my_thread(&threadFunc, ch, 
&cq);    cout << "sleeping" << endl;    sleep(5);    cout << "slept" << 
endl;    cq.Shutdown();    cout << "shut down cq" << endl;    
my_thread.join();  }}*
 
================
================
================

*// main.cpp*  *- Channel::NotifyOnStateChange with finite timeout*



































































*#include <chrono>#include <iostream>#include <memory>#include 
<thread>#include <grpcpp/grpcpp.h>#include <unistd.h>using namespace 
std;using namespace grpc;void threadFunc(shared_ptr<Channel> ch, 
CompletionQueue* cq) {  void* tag = NULL;  bool ok = false;  int i = 1;  
grpc_connectivity_state state = ch->GetState(false);  
std::chrono::time_point<std::chrono::system_clock> now =        
std::chrono::system_clock::now();  
std::chrono::time_point<std::chrono::system_clock> deadline =    now + 
std::chrono::seconds(2);  cout << "state " << i++ << " = " << (int)state << 
endl;  ch->NotifyOnStateChange(state,                          
//gpr_inf_future(GPR_CLOCK_MONOTONIC),                          deadline,  
                        cq,                          (void*)1);  while 
(cq->Next(&tag, &ok)) {    state = ch->GetState(false);    cout << "state " 
<< i++ << " = " << (int)state << endl;    now = 
std::chrono::system_clock::now();    deadline = now + 
std::chrono::seconds(2);    ch->NotifyOnStateChange(state,                  
          //gpr_inf_future(GPR_CLOCK_MONOTONIC),                            
deadline,                            cq,                            
(void*)1);  }  cout << "thread end" << endl;}int main(int argc, char* 
argv[]) {  ChannelArguments channel_args;  CompletionQueue cq;  
channel_args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);  
channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);  
channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000);  
channel_args.SetInt(GRPC_ARG_HTTP2_BDP_PROBE, 0);  
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);  
channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 30000);  
channel_args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS,  
                    60000);  {    shared_ptr<Channel> 
ch(CreateCustomChannel("my_grpc_server:50051",                              
                 InsecureChannelCredentials(),                              
                 channel_args));    std::thread my_thread(&threadFunc, ch, 
&cq);    cout << "sleeping" << endl;    sleep(5);    cout << "slept" << 
endl;    cq.Shutdown();    cout << "shut down cq" << endl;    
my_thread.join();  }}*

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/c27cb995-04d3-48a3-99da-6321958a29a9n%40googlegroups.com.

Reply via email to