This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d3c0715ec61260bbc31ceb96c6a6721fd5142496 Author: feynmanlin <[email protected]> AuthorDate: Sat May 15 09:59:04 2021 +0800 Make failPendingMessages called from within the ProducerImpl object mutex (#10528) `failPendingMessages()` will traverse the `pendingMessages` queue and this operation is not atomic. `failPendingMessages()` should be called from within the `ProducerImpl` object mutex. https://github.com/apache/pulsar/blob/4d2d66d17da73426e2281251f3b05a63218b70fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1602-L1608 There are 6 places in ` ProducerImpl ` that use this method, but 3 places are not locked. Add object mutex (cherry picked from commit cd7e3c05ce0ee64341341541a965df3305e6bfc1) --- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 786af84..40eba48 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -912,10 +912,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (previousState != State.Terminated && previousState != State.Closed) { log.info("[{}] [{}] The topic has been terminated", topic, producerName); setClientCnx(null); - - failPendingMessages(cnx, - new PulsarClientException.TopicTerminatedException( - format("The topic %s that the producer %s produces to has been terminated", topic, producerName))); + synchronized (this) { + failPendingMessages(cnx, + new PulsarClientException.TopicTerminatedException( + format("The topic %s that the producer %s produces to has been terminated", topic, producerName))); + } } } @@ -1387,7 +1388,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (cause instanceof PulsarClientException.TopicTerminatedException) { setState(State.Terminated); - failPendingMessages(cnx(), (PulsarClientException) cause); + synchronized (this) { + failPendingMessages(cnx(), (PulsarClientException) cause); + } producerCreatedFuture.completeExceptionally(cause); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || //
