This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2f68615 C++ client producer sendAsync() method will be blocked
forever, if enough batched messages sent timeout. (#4569) (#4657)
2f68615 is described below
commit 2f6861579c0bfc370abd9ecf5409c683eaa96ee7
Author: Easyfan Zheng <[email protected]>
AuthorDate: Thu Jul 4 03:16:56 2019 +0800
C++ client producer sendAsync() method will be blocked forever, if enough
batched messages sent timeout. (#4569) (#4657)
---
pulsar-client-cpp/lib/ProducerImpl.cc | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc
b/pulsar-client-cpp/lib/ProducerImpl.cc
index b2e7848..bf9e3ac 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -232,6 +232,14 @@ void ProducerImpl::failPendingMessages(Result result) {
// without holding producer mutex.
for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it
!= pendingMessagesQueue_.end();
it++) {
+ // When dealing any failure message, if the current message is a batch
one, we should also release
+ // the reserved spots in the pendingMessageQueue_, for all individual
messages inside this batch
+ // message. See 'ProducerImpl::sendAsync' for more details.
+ if (it->msg_.impl_->metadata.has_num_messages_in_batch()) {
+ // batch message - need to release more spots
+ // -1 since the pushing batch message into the queue already
released a spot
+
pendingMessagesQueue_.release(it->msg_.impl_->metadata.num_messages_in_batch()
- 1);
+ }
messagesToFail.push_back(*it);
}