This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1c3c3f40b0d58108a4d0f53489f272f8468bed65 Author: lipenghui <[email protected]> AuthorDate: Sat Feb 12 13:26:42 2022 +0800 Fix race condition of OpSendMsgQueue when publishing messages (#14231) * Add synchronized for getPendingQueueSize(); * Use iterator instead. * Use counter to keep track of messages count * Changed to int Co-authored-by: Matteo Merli <[email protected]> --- .../apache/pulsar/client/impl/ProducerImpl.java | 34 ++++++++++------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 877d88b..8486283 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; @@ -994,7 +993,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (sequenceId > op.sequenceId) { log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName, - op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size()); + op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.messagesCount()); // Force connection closing so that messages can be re-transmitted in a new connection cnx.channel().close(); return; @@ -1016,7 +1015,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne releaseSemaphoreForSendOp(op); } else { log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName, - op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size()); + op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.messagesCount()); // Force connection closing so that messages can be re-transmitted in a new connection cnx.channel().close(); return; @@ -1332,6 +1331,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private final Queue<OpSendMsg> delegate = new ArrayDeque<>(); private int forEachDepth = 0; private List<OpSendMsg> postponedOpSendMgs; + private final AtomicInteger messagesCount = new AtomicInteger(0); @Override public void forEach(Consumer<? super OpSendMsg> action) { @@ -1352,6 +1352,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne public boolean add(OpSendMsg o) { // postpone adding to the queue while forEach iteration is in progress + messagesCount.addAndGet(o.numMessagesInBatch); if (forEachDepth > 0) { if (postponedOpSendMgs == null) { postponedOpSendMgs = new ArrayList<>(); @@ -1364,18 +1365,22 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne public void clear() { delegate.clear(); + messagesCount.set(0); } public void remove() { - delegate.remove(); + OpSendMsg op = delegate.remove(); + if (op != null) { + messagesCount.addAndGet(-op.numMessagesInBatch); + } } public OpSendMsg peek() { return delegate.peek(); } - public int size() { - return delegate.size(); + public int messagesCount() { + return messagesCount.get(); } @Override @@ -1548,7 +1553,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (log.isDebugEnabled()) { log.debug("[{}] [{}] Pending messages: {}", topic, producerName, - pendingMessages.size()); + pendingMessages.messagesCount()); } PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException( @@ -1649,7 +1654,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne cnx.channel().close(); return; } - int messagesToResend = pendingMessages.size(); + int messagesToResend = pendingMessages.messagesCount(); if (messagesToResend == 0) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] No pending messages to resend {}", topic, producerName, messagesToResend); @@ -1755,7 +1760,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // The diff is less than or equal to zero, meaning that the message has been timed out. // Set the callback to timeout on every message, then clear the pending queue. log.info("[{}] [{}] Message send timed out. Failing {} messages", topic, producerName, - pendingMessages.size()); + pendingMessages.messagesCount()); PulsarClientException te = new PulsarClientException.TimeoutException( format("The producer %s can not send message to the topic %s within given timeout", @@ -1925,7 +1930,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // called again once the new connection registers the producer with the broker. log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the " + " {} pending messages since they will deliver using another connection.", topic, producerName, - pendingMessages.size()); + pendingMessages.messagesCount()); return; } final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion(); @@ -1997,14 +2002,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } public int getPendingQueueSize() { - if (!isBatchMessagingEnabled()) { - return pendingMessages.size(); - } - MutableInt size = new MutableInt(0); - pendingMessages.forEach(op -> { - size.add(Math.max(op.numMessagesInBatch, 1)); - }); - return size.getValue(); + return pendingMessages.messagesCount(); } @Override
