I am working on a C++ project, which uses Google Pub/Sub.

As there is no native support for Google Pub/Sub in C++, I am using it 
through gRPC. Thus, I have generated corresponding pubsub.grpc.pb.h, 
pubsub.grpc.pb.cc, pubsub.pb.h and pubsub.pb.cc files via protoc.

I wrote a lightweight wrapper-class, for subscription management. Class 
basically creates a new thread and starts listening for new messages. Here 
is the code example (code was built based on this 
<https://stackoverflow.com/questions/56260017/consumer-example-for-google-pub-sub-in-c>
 
question):


class Consumer
{
public:
    Consumer(); 
    ~Consumer();
    void startConsume();
// ... 
    std::string m_subscriptionName;
    std::unique_ptr<std::thread> m_thread;
    std::shared_ptr<grpc::Channel> m_channel;
    std::unique_ptr<google::pubsub::v1::Subscriber::Stub> m_stub; 
    std::atomic<bool> m_runThread;
}; 

Consumer::Consumer()
{
    m_channel = grpc::CreateChannel("pubsub.googleapis.com:443", 
grpc::GoogleDefaultCredentials()); 
    m_stub = google::pubsub::v1::Subscriber::NewStub(m_channel);
    m_subscriptionName = "something";
}

Consumer::~Consumer()
{
    m_runThread = false;
    if (m_thread && m_thread->joinable())
    {
        m_thread->join();
    }
} 

void Consumer::startConsume()
{
    m_thread.reset(new std::thread([this]() { 
        m_runThread = true; 
        while (m_runThread) 
        {
            grpc::ClientContext context;
            
std::unique_ptr<grpc::ClientReaderWriter<google::pubsub::v1::StreamingPullRequest,
 
                                                     
google::pubsub::v1::StreamingPullResponse>> 
stream(m_stub->StreamingPull(&context));
            // send the initial message 
            google::pubsub::v1::StreamingPullRequest req; 
            req.set_subscription(m_subscriptionName);
            req.set_stream_ack_deadline_seconds(10);
            
            // if write passed successfully, start subscription 
            if (!stream->Write(req))
                continue;

            // receive messages 
            google::pubsub::v1::StreamingPullResponse response;
            while (stream->Read(&response)) 
            { 
                google::pubsub::v1::StreamingPullRequest ack_request;
                for (const auto& message : response.received_messages()) 
                { 
                    // process messages ... 
                    ack_request.add_ack_ids(message.ack_id()); 
                } 
                stream->Write(ack_request);
            } 
        }
    })); 
}


Several instances of the Consumer class are created within a process, each for 
different topic.

It seems works fine. However sometimes program stucks on 
stream->Read(&response)
code. Debugging showed that thread was stuck inside of Read() function call - 
the stream does not read anything and does not exit from function either, 
despite that Pub/Sub buffer is not empty. 

After restarting the application, all messages are successfully read. It seems 
like a deadlock inside of Read().

Is there anything that I am doing wrong? What can cause this behavior?

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/675ef359-1311-4e1d-b69e-65f48f899427%40googlegroups.com.

Reply via email to