You can turn on some debug tracing with environment variable GRPC_TRACE, 
for example setting it to "http,secure_endpoint,tcp". For a complete list, 
refer to: 
https://github.com/grpc/grpc/blob/master/doc/environment_variables.md

On Saturday, October 26, 2019 at 11:24:07 AM UTC-7 [email protected] 
wrote:

> I am working on a C++ project, which uses Google Pub/Sub.
>
> As there is no native support for Google Pub/Sub in C++, I am using it 
> through gRPC. Thus, I have generated corresponding pubsub.grpc.pb.h, 
> pubsub.grpc.pb.cc, pubsub.pb.h and pubsub.pb.cc files via protoc.
>
> I wrote a lightweight wrapper-class, for subscription management. Class 
> basically creates a new thread and starts listening for new messages. Here 
> is the code example (code was built based on this 
> <https://stackoverflow.com/questions/56260017/consumer-example-for-google-pub-sub-in-c>
>  
> question):
>
>
> class Consumer
> {
> public:
>     Consumer(); 
>     ~Consumer();
>     void startConsume();
> // ... 
>     std::string m_subscriptionName;
>     std::unique_ptr<std::thread> m_thread;
>     std::shared_ptr<grpc::Channel> m_channel;
>     std::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub; 
>     std::atomic<bool> m_runThread;
> }; 
>
> Consumer::Consumer()
> {
>     m_channel = grpc::CreateChannel("pubsub.googleapis.com:443", 
> grpc::GoogleDefaultCredentials()); 
>     m_stub = google::pubsub::v1::Subscriber::NewStub(m_channel);
>     m_subscriptionName = "something";
> }
>
> Consumer::~Consumer()
> {
>     m_runThread = false;
>     if (m_thread && m_thread->joinable())
>     {
>         m_thread->join();
>     }
> } 
>
> void Consumer::startConsume()
> {
>     m_thread.reset(new std::thread([this]() { 
>         m_runThread = true; 
>         while (m_runThread) 
>         {
>             grpc::ClientContext context;
>             
> std::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest,
>  
>                                                      
> google::pubsub::v1::StreamingPullResponse>> 
> stream(m_stub->StreamingPull(&context));
>             // send the initial message 
>             google::pubsub::v1::StreamingPullRequest req; 
>             req.set_subscription(m_subscriptionName);
>             req.set_stream_ack_deadline_seconds(10);
>             
>             // if write passed successfully, start subscription 
>             if (!stream->Write(req))
>                 continue;
>
>             // receive messages 
>             google::pubsub::v1::StreamingPullResponse response;
>             while (stream->Read(&response)) 
>             { 
>                 google::pubsub::v1::StreamingPullRequest ack_request;
>                 for (const auto& message : response.received_messages()) 
>                 { 
>                     // process messages ... 
>                     ack_request.add_ack_ids(message.ack_id()); 
>                 } 
>                 stream->Write(ack_request);
>             } 
>         }
>     })); 
> }
>
>
> Several instances of the Consumer class are created within a process, each 
> for different topic.
>
> It seems works fine. However sometimes program stucks on 
> stream->Read(&response)
> code. Debugging showed that thread was stuck inside of Read() function call - 
> the stream does not read anything and does not exit from function either, 
> despite that Pub/Sub buffer is not empty. 
>
> After restarting the application, all messages are successfully read. It 
> seems like a deadlock inside of Read().
>
> Is there anything that I am doing wrong? What can cause this behavior?
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/4e8d0d3c-1128-472b-b746-b9f027ab91b7%40googlegroups.com.

Reply via email to