On Friday, June 19, 2020 at 6:25:28 PM UTC-7, sathya M wrote:

> As you mentioned, I have done a shutdown as per the documents comment
> given. As per the document given, Completion Queue will get drained only
> after the CQ shutdown.

That's not _quite_ what the documentation says. It says that after invoking
CompletionQueue::Shutdown() that _something else_ *must* drain the CQ before
the CQ is destroyed.

Importantly, it does not say that the CQ will be drained.

> what should be done inorder to avoid this kind of failure? It looks like
> grpc has some issue in shutting down CQ.

You'll need to make sure that you've actually drained the completion queue
before deleting it. In the code you posted, I see a possible sequence of
events that can cause a non-empty CQ to be deleted.

1. The OS stops running the thread you created to run ServerImpl::Run
2. main is resumed
3. `delete server' invokes ServerImpl::~ServerImpl
4. This calls server_->Shutdown() and then cq_->Shutdown();
5. After ServerImpl::~ServerImpl is done, the C++ runtime invokes the
   destructors for the ServerImpl member variables, one of which is cq_.
6. The unique_ptr<CompetionQueue>::~unique_ptr destructor deletes the
   completion queue.
7. Assertion failure.

Since ServerImpl::Run isn't running, there's nothing to drain the CQ.

(Also, the delete server deletes an object while it is still being used on
another thread. You'll want to make sure that thread_ has finished and been
joined before deleting server.)

Try something like this instead. It basically gets rid of ServiceImpl. Once
you have this working, you can package that up into an object if you want.
(Warning: I wrote this code in my email editor. It may not compile, and it
may not work, but it should help point you in the right direction.)

void PumpCompletionQueue(CompletionQueue* cq) {
  while (true) {
    void* tag;
    bool ok;

    if (cq->Next(&tag, &ok)) {
      // handle dispatching tag: this is the CallData stuff
    } else {
      std::cout << Queue is drained" << std::endl;
      return;
    }
  }
}

int main() {
  Greeter::AsyncService service;

  ServerBuilder sb;
  // configure as needed
  sb.RegisterService(&service);
  std::unique_ptr<CompletionQueue> cq = sb.AddCompletionQueue();
  std::unique_ptr<Server> server = sb.BuildAndStart();

  std::thread pumpCqThread { &PumpCompletionQueue, cq.get() };

  sleep(30);

  // start the shutdown process
  server->Shutdown();
  cq->Shutdown();
  pumpCqThread.join(); // ensures the CQ has been drained

  // no more RPCs are being handled & the CQ is empty. The objects can be
  // deleted.
  //
  // These calls to .reset aren't strictly needed, as the unique_ptr
  // destructors will run when main exits, but there's here to make it
  // easier to set breakpoints and step through what's going on in the
  // debugger.
  server.reset();
  cq.reset();

  return 0;
}

Hope this helps!

--
Christopher Warrington
Microsoft Corp.

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/2af5d2b9-a5d5-477c-9cb7-bd5284f49620o%40googlegroups.com.

Reply via email to