This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit caf82b5404a8c7dcc00e9101dc4848ef0c8fe2f3
Author: hrsakai <[email protected]>
AuthorDate: Fri Jan 24 12:24:58 2020 +0900

    Fix zeroQueueConsumer using listener (#6106)
    
    ### Motivation
    
    Available permits of ZeroQueueConsuemer must be 1 or less, however 
ZeroQueueConsuemer using listener may be greater than 1.
    
    
    ### Modifications
    
    If listener is processing message, ZeroQueueConsumer doesn't send permit 
when it reconnect to broker.
    
    
    ### Reproduction
    1. ZeroQueueConsuemer using listener consume a topic.
    
    2. Unload that topic( or restart a broker) when listener is processing 
message.
    
    3.  ZeroQueueConsumer sends permit when it reconnect to broker.
    
https://github.com/apache/pulsar/blob/v2.5.0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L133
    
    4. ZeroQueueConsumer also sends permit when finished processing message.
    
https://github.com/apache/pulsar/blob/v2.5.0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L163
    
    5. Available permits become 2.
---
 .../java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 44f3425..94c8dd3 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -45,6 +45,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> 
{
     private final Lock zeroQueueLock = new ReentrantLock();
 
     private volatile boolean waitingOnReceiveForZeroQueueSize = false;
+    private volatile boolean waitingOnListenerForZeroQueueSize = false;
 
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, 
ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, boolean 
hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
@@ -131,7 +132,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
         // or queue was not empty: send a flow command
         if (waitingOnReceiveForZeroQueueSize
                 || currentQueueSize > 0
-                || listener != null) {
+                || (listener != null && !waitingOnListenerForZeroQueueSize)) {
             sendFlowPermitsToBroker(cnx, 1);
         }
     }
@@ -157,6 +158,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
                     log.debug("[{}][{}] Calling message listener for unqueued 
message {}", topic, subscription,
                             message.getMessageId());
                 }
+                waitingOnListenerForZeroQueueSize = true;
                 trackMessage(message);
                 listener.received(ZeroQueueConsumerImpl.this, 
beforeConsume(message));
             } catch (Throwable t) {
@@ -164,6 +166,7 @@ public class ZeroQueueConsumerImpl<T> extends 
ConsumerImpl<T> {
                         message.getMessageId(), t);
             }
             increaseAvailablePermits(cnx());
+            waitingOnListenerForZeroQueueSize = false;
         });
     }
 

Reply via email to