This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit caf82b5404a8c7dcc00e9101dc4848ef0c8fe2f3 Author: hrsakai <[email protected]> AuthorDate: Fri Jan 24 12:24:58 2020 +0900 Fix zeroQueueConsumer using listener (#6106) ### Motivation Available permits of ZeroQueueConsuemer must be 1 or less, however ZeroQueueConsuemer using listener may be greater than 1. ### Modifications If listener is processing message, ZeroQueueConsumer doesn't send permit when it reconnect to broker. ### Reproduction 1. ZeroQueueConsuemer using listener consume a topic. 2. Unload that topic( or restart a broker) when listener is processing message. 3. ZeroQueueConsumer sends permit when it reconnect to broker. https://github.com/apache/pulsar/blob/v2.5.0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L133 4. ZeroQueueConsumer also sends permit when finished processing message. https://github.com/apache/pulsar/blob/v2.5.0/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L163 5. Available permits become 2. --- .../java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index 44f3425..94c8dd3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -45,6 +45,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { private final Lock zeroQueueLock = new ReentrantLock(); private volatile boolean waitingOnReceiveForZeroQueueSize = false; + private volatile boolean waitingOnListenerForZeroQueueSize = false; public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture, @@ -131,7 +132,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { // or queue was not empty: send a flow command if (waitingOnReceiveForZeroQueueSize || currentQueueSize > 0 - || listener != null) { + || (listener != null && !waitingOnListenerForZeroQueueSize)) { sendFlowPermitsToBroker(cnx, 1); } } @@ -157,6 +158,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { log.debug("[{}][{}] Calling message listener for unqueued message {}", topic, subscription, message.getMessageId()); } + waitingOnListenerForZeroQueueSize = true; trackMessage(message); listener.received(ZeroQueueConsumerImpl.this, beforeConsume(message)); } catch (Throwable t) { @@ -164,6 +166,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> { message.getMessageId(), t); } increaseAvailablePermits(cnx()); + waitingOnListenerForZeroQueueSize = false; }); }
