BewareMyPower opened a new issue, #265: URL: https://github.com/apache/pulsar-client-cpp/issues/265
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar-client-cpp/issues) and found nothing similar. ### Version - Pulsar: 2.11.0 - Client: master (4338d45) ### Minimal reproduce step ```c++ #include <pulsar/Client.h> #include <atomic> #include <chrono> #include <mutex> #include <sstream> #include <thread> using namespace pulsar; static std::mutex gMutex; static std::atomic_bool gDuringIntercept{false}; class MyConsumerIntercepter : public ConsumerInterceptor { public: Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; } void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {} void onAcknowledgeCumulative(const Consumer& consumer, Result result, const MessageId& messageID) override {} void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override { auto id = std::this_thread::get_id(); std::ostringstream oss; oss << id << " onNegativeAcksSend for "; for (auto&& msgId : messageIds) { oss << " " << msgId; } std::cout << oss.str() << std::endl; gDuringIntercept = true; // Wait for `consumer.negativeAcknowledge` being called std::this_thread::sleep_for(std::chrono::seconds(2)); std::lock_guard<std::mutex> lock{gMutex}; std::cout << id << " onNegativeAcksSend acquired gMutex" << std::endl; } }; int main() { Client client("pulsar://localhost:6650"); const auto topic = "my-topic"; auto interceptor = std::make_shared<MyConsumerIntercepter>(); ConsumerConfiguration conf; conf.setNegativeAckRedeliveryDelayMs(1000); conf.intercept({interceptor}); Consumer consumer; client.subscribe(topic, "sub", conf, consumer); Producer producer; client.createProducer(topic, producer); producer.send(MessageBuilder().setContent("content").build()); Message msg; consumer.receive(msg); consumer.negativeAcknowledge(msg); // Wait for negative acknowledge timer while (!gDuringIntercept) { std::this_thread::sleep_for(std::chrono::milliseconds(200)); } // Deadlock happens here { std::lock_guard<std::mutex> lock{gMutex}; std::cout << std::this_thread::get_id() << " Acquired gMutex, call negativeAcknowledge" << std::endl; consumer.negativeAcknowledge(msg); } client.close(); } ``` ### What did you expect to see? The application should stop after a few seconds. ### What did you see instead? The application hangs forever. Here are the key stacks that show the deadlock: ``` Thread 2 (Thread 0x7fb597771700 (LWP 16231)): #0 __lll_lock_wait (futex=futex@entry=0x559b17124180 <gMutex>, private=0) at lowlevellock.c:52 #1 0x00007fb598f910a3 in __GI___pthread_mutex_lock (mutex=0x559b17124180 <gMutex>) at ../nptl/pthread_mutex_lock.c:80 #2 0x0000559b1711cefc in __gthread_mutex_lock (__mutex=0x559b17124180 <gMutex>) at /usr/include/x86_64-linux-gnu/c++/9/bits/gthr-default.h:749 #3 0x0000559b1711d184 in std::mutex::lock (this=0x559b17124180 <gMutex>) at /usr/include/c++/9/bits/std_mutex.h:100 #4 0x0000559b1711dda0 in std::lock_guard<std::mutex>::lock_guard (this=0x7fb59776fd88, __m=...) at /usr/include/c++/9/bits/std_mutex.h:159 #5 0x0000559b1711d5b2 in MyConsumerIntercepter::onNegativeAcksSend (this=0x559b17f1daa0, consumer=..., messageIds=Python Exception <class 'AttributeError'> 'NoneType' object has no attribute 'pointer': std::set with 1 element) at /home/xyz/pulsar-client-cpp/examples/SampleProducer.cc:52 Thread 1 (Thread 0x7fb59777cc00 (LWP 16230)): #0 __lll_lock_wait (futex=futex@entry=0x7fb590003df0, private=0) at lowlevellock.c:52 #1 0x00007fb598f910a3 in __GI___pthread_mutex_lock (mutex=0x7fb590003df0) at ../nptl/pthread_mutex_lock.c:80 #2 0x0000559b1711cefc in __gthread_mutex_lock (__mutex=0x7fb590003df0) at /usr/include/x86_64-linux-gnu/c++/9/bits/gthr-default.h:749 --Type <RET> for more, q to quit, c to continue without paging-- #3 0x0000559b1711d184 in std::mutex::lock (this=0x7fb590003df0) at /usr/include/c++/9/bits/std_mutex.h:100 #4 0x0000559b1711dda0 in std::lock_guard<std::mutex>::lock_guard (this=0x7ffe098062d0, __m=...) at /usr/include/c++/9/bits/std_mutex.h:159 #5 0x00007fb5997ac01f in pulsar::NegativeAcksTracker::add (this=0x7fb590003de8, m=...) at /home/xyz/pulsar-client-cpp/lib/NegativeAcksTracker.cc:90 #6 0x00007fb5996ef3e4 in pulsar::ConsumerImpl::negativeAcknowledge (this=0x7fb590002f20, messageId=...) at /home/xyz/pulsar-client-cpp/lib/ConsumerImpl.cc:1198 ``` ### Anything else? Though the reproduce code uses the intercepter feature that is not included in any release, this bug also affects the Python client that depends on any existing C++ client. See detailed analysis in https://github.com/apache/pulsar-client-python/issues/116#issuecomment-1537049348 ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
