You can turn on some debug tracing with environment variable GRPC_TRACE, for example setting it to "http,secure_endpoint,tcp". For a complete list, refer to: https://github.com/grpc/grpc/blob/master/doc/environment_variables.md
On Saturday, October 26, 2019 at 11:24:07 AM UTC-7 [email protected] wrote: > I am working on a C++ project, which uses Google Pub/Sub. > > As there is no native support for Google Pub/Sub in C++, I am using it > through gRPC. Thus, I have generated corresponding pubsub.grpc.pb.h, > pubsub.grpc.pb.cc, pubsub.pb.h and pubsub.pb.cc files via protoc. > > I wrote a lightweight wrapper-class, for subscription management. Class > basically creates a new thread and starts listening for new messages. Here > is the code example (code was built based on this > <https://stackoverflow.com/questions/56260017/consumer-example-for-google-pub-sub-in-c> > > question): > > > class Consumer > { > public: > Consumer(); > ~Consumer(); > void startConsume(); > // ... > std::string m_subscriptionName; > std::unique_ptr<std::thread> m_thread; > std::shared_ptr<grpc::Channel> m_channel; > std::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub; > std::atomic<bool> m_runThread; > }; > > Consumer::Consumer() > { > m_channel = grpc::CreateChannel("pubsub.googleapis.com:443", > grpc::GoogleDefaultCredentials()); > m_stub = google::pubsub::v1::Subscriber::NewStub(m_channel); > m_subscriptionName = "something"; > } > > Consumer::~Consumer() > { > m_runThread = false; > if (m_thread && m_thread->joinable()) > { > m_thread->join(); > } > } > > void Consumer::startConsume() > { > m_thread.reset(new std::thread([this]() { > m_runThread = true; > while (m_runThread) > { > grpc::ClientContext context; > > std::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest, > > > google::pubsub::v1::StreamingPullResponse>> > stream(m_stub->StreamingPull(&context)); > // send the initial message > google::pubsub::v1::StreamingPullRequest req; > req.set_subscription(m_subscriptionName); > req.set_stream_ack_deadline_seconds(10); > > // if write passed successfully, start subscription > if (!stream->Write(req)) > continue; > > // receive messages > google::pubsub::v1::StreamingPullResponse response; > while (stream->Read(&response)) > { > google::pubsub::v1::StreamingPullRequest ack_request; > for (const auto& message : response.received_messages()) > { > // process messages ... > ack_request.add_ack_ids(message.ack_id()); > } > stream->Write(ack_request); > } > } > })); > } > > > Several instances of the Consumer class are created within a process, each > for different topic. > > It seems works fine. However sometimes program stucks on > stream->Read(&response) > code. Debugging showed that thread was stuck inside of Read() function call - > the stream does not read anything and does not exit from function either, > despite that Pub/Sub buffer is not empty. > > After restarting the application, all messages are successfully read. It > seems like a deadlock inside of Read(). > > Is there anything that I am doing wrong? What can cause this behavior? > > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/4e8d0d3c-1128-472b-b746-b9f027ab91b7%40googlegroups.com.
