I am working on a C++ project, which uses Google Pub/Sub.
As there is no native support for Google Pub/Sub in C++, I am using it through gRPC. Thus, I have generated corresponding pubsub.grpc.pb.h, pubsub.grpc.pb.cc, pubsub.pb.h and pubsub.pb.cc files via protoc. I wrote a lightweight wrapper-class, for subscription management. Class basically creates a new thread and starts listening for new messages. Here is the code example (code was built based on this <https://stackoverflow.com/questions/56260017/consumer-example-for-google-pub-sub-in-c> question): class Consumer { public: Consumer(); ~Consumer(); void startConsume(); // ... std::string m_subscriptionName; std::unique_ptr<std::thread> m_thread; std::shared_ptr<grpc::Channel> m_channel; std::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub; std::atomic<bool> m_runThread; }; Consumer::Consumer() { m_channel = grpc::CreateChannel("pubsub.googleapis.com:443", grpc::GoogleDefaultCredentials()); m_stub = google::pubsub::v1::Subscriber::NewStub(m_channel); m_subscriptionName = "something"; } Consumer::~Consumer() { m_runThread = false; if (m_thread && m_thread->joinable()) { m_thread->join(); } } void Consumer::startConsume() { m_thread.reset(new std::thread([this]() { m_runThread = true; while (m_runThread) { grpc::ClientContext context; std::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest, google::pubsub::v1::StreamingPullResponse>> stream(m_stub->StreamingPull(&context)); // send the initial message google::pubsub::v1::StreamingPullRequest req; req.set_subscription(m_subscriptionName); req.set_stream_ack_deadline_seconds(10); // if write passed successfully, start subscription if (!stream->Write(req)) continue; // receive messages google::pubsub::v1::StreamingPullResponse response; while (stream->Read(&response)) { google::pubsub::v1::StreamingPullRequest ack_request; for (const auto& message : response.received_messages()) { // process messages ... ack_request.add_ack_ids(message.ack_id()); } stream->Write(ack_request); } } })); } Several instances of the Consumer class are created within a process, each for different topic. It seems works fine. However sometimes program stucks on stream->Read(&response) code. Debugging showed that thread was stuck inside of Read() function call - the stream does not read anything and does not exit from function either, despite that Pub/Sub buffer is not empty. After restarting the application, all messages are successfully read. It seems like a deadlock inside of Read(). Is there anything that I am doing wrong? What can cause this behavior? -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/675ef359-1311-4e1d-b69e-65f48f899427%40googlegroups.com.
