I originally posted my question at Stack Overflow here <https://stackoverflow.com/questions/74036172/how-to-unblock-drain-a-grpccompletionqueue-waiting-on-a-state-change> and here <https://stackoverflow.com/questions/74044672/why-does-shutting-down-this-grpccompletionqueue-cause-an-assertion> . I later discovered this grpc-specific community, and I thought I might have better luck getting help here.
Very quickly, the premise is that I'm working with inherited code that I suspect was never tested/exercised. The intent of the code is notify listeners when a Channel's state changes. I've created a simpler MVCE (that just prints to stdout instead of notifying listeners) to demonstrate my question/problem. The tl;dr of my question is: I have a thread that is waiting on a CompletionQueue::Next that is waiting on a Channel::NotifyOnStateChange, and I am unclear how to cleanly break the CompletionQueue::Next block. The first version of my code uses Channel::NotifyOnStateChange() with an infinte deadline. In a separate thread, I call CompletionQueue::Shutdown(): my understanding is that this is what is necessary to unblock a CompletionQueue::Next() and have it return false. This does not happen, and I have a weak understanding that this is perhaps because the Channel::NotifyOnStateChange that pending in the CompletionQueue has an infinite deadline, and therefore never "completes" its life-cycle in the CompletionQueue if there is no channel state change, which is a realistic possibility. The second version of my code uses Channel::NotifyOnStateChange() with a non-infinite deadline. With this version of code, I do observe the CompletionQueue::Next() periodically unblocking. So in my other thread, I call CompletionQueue::Shutdown(), expecting that is what is correct and necessary to unblock the CompletionQueue::Next(). But this results in my executable exiting non-gracefully; the following is output to stdout: E1012 15:29:07.677225824 54 channel_connectivity.cc:234] assertion failed: grpc_cq_begin_op(cq, tag) Aborted (core dumped) I don't understand what the error is here. I'd be grateful for help from the community. My question is ultimately: *how do I cleanly exit a loop waiting on a CompletionQueue::Next() waiting on a Channel::NotifyOnStateChange() ?* I have a preference for the NotifyOnStateChange to have an infinite deadline because the original code that I'm working with does so, and I'd like to make minimal changes, on the assumption that the original author made informed and deliberate choices. My code is below: ================ *// main.cpp - Channel::NotifyOnStateChange with infinite timeout#include <iostream>#include <memory>#include <thread>#include <grpcpp/grpcpp.h>#include <unistd.h>using namespace std;using namespace grpc;void threadFunc(shared_ptr<Channel> ch, CompletionQueue* cq) { void* tag = NULL; bool ok = false; int i = 1; grpc_connectivity_state state = ch->GetState(false); cout << "state " << i++ << " = " << (int)state << endl; ch->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_MONOTONIC), cq, (void*)1); while (cq->Next(&tag, &ok)) { state = ch->GetState(false); cout << "state " << i++ << " = " << (int)state << endl; ch->NotifyOnStateChange(state, gpr_inf_future(GPR_CLOCK_MONOTONIC), cq, (void*)1); } cout << "thread end" << endl;}int main(int argc, char* argv[]) { ChannelArguments channel_args; CompletionQueue cq; channel_args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000); channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000); channel_args.SetInt(GRPC_ARG_HTTP2_BDP_PROBE, 0); channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000); channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 30000); channel_args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 60000); { shared_ptr<Channel> ch(CreateCustomChannel("my_grpc_server:50051", InsecureChannelCredentials(), channel_args)); std::thread my_thread(&threadFunc, ch, &cq); cout << "sleeping" << endl; sleep(5); cout << "slept" << endl; cq.Shutdown(); cout << "shut down cq" << endl; my_thread.join(); }}* ================ ================ ================ *// main.cpp* *- Channel::NotifyOnStateChange with finite timeout* *#include <chrono>#include <iostream>#include <memory>#include <thread>#include <grpcpp/grpcpp.h>#include <unistd.h>using namespace std;using namespace grpc;void threadFunc(shared_ptr<Channel> ch, CompletionQueue* cq) { void* tag = NULL; bool ok = false; int i = 1; grpc_connectivity_state state = ch->GetState(false); std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now(); std::chrono::time_point<std::chrono::system_clock> deadline = now + std::chrono::seconds(2); cout << "state " << i++ << " = " << (int)state << endl; ch->NotifyOnStateChange(state, //gpr_inf_future(GPR_CLOCK_MONOTONIC), deadline, cq, (void*)1); while (cq->Next(&tag, &ok)) { state = ch->GetState(false); cout << "state " << i++ << " = " << (int)state << endl; now = std::chrono::system_clock::now(); deadline = now + std::chrono::seconds(2); ch->NotifyOnStateChange(state, //gpr_inf_future(GPR_CLOCK_MONOTONIC), deadline, cq, (void*)1); } cout << "thread end" << endl;}int main(int argc, char* argv[]) { ChannelArguments channel_args; CompletionQueue cq; channel_args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); channel_args.SetInt(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000); channel_args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000); channel_args.SetInt(GRPC_ARG_HTTP2_BDP_PROBE, 0); channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000); channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 30000); channel_args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 60000); { shared_ptr<Channel> ch(CreateCustomChannel("my_grpc_server:50051", InsecureChannelCredentials(), channel_args)); std::thread my_thread(&threadFunc, ch, &cq); cout << "sleeping" << endl; sleep(5); cout << "slept" << endl; cq.Shutdown(); cout << "shut down cq" << endl; my_thread.join(); }}* -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/c27cb995-04d3-48a3-99da-6321958a29a9n%40googlegroups.com.