This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3c0715ec61260bbc31ceb96c6a6721fd5142496
Author: feynmanlin <[email protected]>
AuthorDate: Sat May 15 09:59:04 2021 +0800

    Make failPendingMessages called from within the ProducerImpl object mutex 
(#10528)
    
    `failPendingMessages()` will traverse the `pendingMessages` queue and this 
operation is not atomic.
    `failPendingMessages()` should be called from within the `ProducerImpl` 
object mutex.
    
    
https://github.com/apache/pulsar/blob/4d2d66d17da73426e2281251f3b05a63218b70fb/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1602-L1608
    
    There are 6 places in ` ProducerImpl ` that use this method, but 3 places 
are not locked.
    
    Add object mutex
    
    (cherry picked from commit cd7e3c05ce0ee64341341541a965df3305e6bfc1)
---
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java    | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 786af84..40eba48 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -912,10 +912,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         if (previousState != State.Terminated && previousState != 
State.Closed) {
             log.info("[{}] [{}] The topic has been terminated", topic, 
producerName);
             setClientCnx(null);
-
-            failPendingMessages(cnx,
-                new PulsarClientException.TopicTerminatedException(
-                    format("The topic %s that the producer %s produces to has 
been terminated", topic, producerName)));
+            synchronized (this) {
+                failPendingMessages(cnx,
+                        new PulsarClientException.TopicTerminatedException(
+                                format("The topic %s that the producer %s 
produces to has been terminated", topic, producerName)));
+            }
         }
     }
 
@@ -1387,7 +1388,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
                     if (cause instanceof 
PulsarClientException.TopicTerminatedException) {
                         setState(State.Terminated);
-                        failPendingMessages(cnx(), (PulsarClientException) 
cause);
+                        synchronized (this) {
+                            failPendingMessages(cnx(), (PulsarClientException) 
cause);
+                        }
                         producerCreatedFuture.completeExceptionally(cause);
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //

Reply via email to