This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4738991 Bug fixes/Improvement for notify pending receive method (#3337) 4738991 is described below commit 47389910c92c6e45d71fa2ddb67a342f8351235e Author: Ezequiel Lovelle <ezequiellove...@gmail.com> AuthorDate: Tue Jan 15 12:15:55 2019 -0300 Bug fixes/Improvement for notify pending receive method (#3337) ### Motivation Prevent 2 bugs and refactoring for method [notifyPendingReceivedCallback()](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L912) ### Modifications - Bugfix interceptor missed event when prefetch messages is disabled by consumer `ReceiverQueueSize == 0`. - Bugfix when message is null and no exception is present. Previously to this commit testing if message was null was made by `checkNotNull()` method leaving consumed future without completion resulting in a hanged future. - Refactor to favour simplicity and readability for control flow. - Add unit tests exploiting bug fixes. ### Result Bug fixes and more maintainable code. --- .../apache/pulsar/client/impl/ConsumerImpl.java | 57 ++++++--- .../pulsar/client/impl/ConsumerImplTest.java | 136 +++++++++++++++++++++ 2 files changed, 173 insertions(+), 20 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6cc5ee2..97f7f4a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -912,27 +912,44 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle * @param message */ void notifyPendingReceivedCallback(final Message<T> message, Exception exception) { - if (!pendingReceives.isEmpty()) { - // fetch receivedCallback from queue - CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll(); - if (exception == null) { - checkNotNull(message, "received message can't be null"); - if (receivedFuture != null) { - if (conf.getReceiverQueueSize() == 0) { - // return message to receivedCallback - receivedFuture.complete(message); - } else { - // increase permits for available message-queue - Message<T> interceptMsg = beforeConsume(message); - messageProcessed(interceptMsg); - // return message to receivedCallback - listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg)); - } - } - } else { - listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); - } + if (pendingReceives.isEmpty()) { + return; + } + + // fetch receivedCallback from queue + final CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll(); + if (receivedFuture == null) { + return; + } + + if (exception != null) { + listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception)); + return; + } + + if (message == null) { + IllegalStateException e = new IllegalStateException("received message can't be null"); + listenerExecutor.execute(() -> receivedFuture.completeExceptionally(e)); + return; } + + if (conf.getReceiverQueueSize() == 0) { + // call interceptor and complete received callback + interceptAndComplete(message, receivedFuture); + return; + } + + // increase permits for available message-queue + messageProcessed(message); + // call interceptor and complete received callback + interceptAndComplete(message, receivedFuture); + } + + private void interceptAndComplete(final Message<T> message, final CompletableFuture<Message<T>> receivedFuture) { + // call proper interceptor + final Message<T> interceptMessage = beforeConsume(message); + // return message to receivedCallback + listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage)); } private void triggerZeroQueueSizeListener(final Message<T> message) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java new file mode 100644 index 0000000..657b4f4 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.mockito.Mockito.*; + +public class ConsumerImplTest { + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private ConsumerImpl<ConsumerImpl> consumer; + private ConsumerConfigurationData consumerConf; + + @BeforeMethod + public void setUp() { + consumerConf = new ConsumerConfigurationData<>(); + ClientConfigurationData clientConf = new ClientConfigurationData(); + PulsarClientImpl client = mock(PulsarClientImpl.class); + CompletableFuture<ClientCnx> clientCnxFuture = new CompletableFuture<>(); + CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new CompletableFuture<>(); + String topic = "non-persistent://tenant/ns1/my-topic"; + + // Mock connection for grabCnx() + when(client.getConnection(anyString())).thenReturn(clientCnxFuture); + clientConf.setOperationTimeoutMs(100); + clientConf.setStatsIntervalSeconds(0); + when(client.getConfiguration()).thenReturn(clientConf); + + consumerConf.setSubscriptionName("test-sub"); + consumer = new ConsumerImpl<ConsumerImpl>(client, topic, consumerConf, + executorService, -1, subscribeFuture, null, null); + } + + @Test(invocationTimeOut = 1000) + public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() { + consumer.notifyPendingReceivedCallback(null, null); + } + + @Test(invocationTimeOut = 1000) + public void testNotifyPendingReceivedCallback_CompleteWithException() { + CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>(); + consumer.pendingReceives.add(receiveFuture); + Exception exception = new PulsarClientException.InvalidMessageException("some random exception"); + consumer.notifyPendingReceivedCallback(null, exception); + + try { + receiveFuture.join(); + } catch (CompletionException e) { + // Completion exception must be the same we provided at calling time + Assert.assertEquals(e.getCause(), exception); + } + + Assert.assertTrue(receiveFuture.isCompletedExceptionally()); + } + + @Test(invocationTimeOut = 1000) + public void testNotifyPendingReceivedCallback_CompleteWithExceptionWhenMessageIsNull() { + CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>(); + consumer.pendingReceives.add(receiveFuture); + consumer.notifyPendingReceivedCallback(null, null); + + try { + receiveFuture.join(); + } catch (CompletionException e) { + Assert.assertEquals("received message can't be null", e.getCause().getMessage()); + } + + Assert.assertTrue(receiveFuture.isCompletedExceptionally()); + } + + @Test(invocationTimeOut = 1000) + public void testNotifyPendingReceivedCallback_InterceptorsWorksWithPrefetchDisabled() { + CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>(); + MessageImpl message = mock(MessageImpl.class); + ConsumerImpl<ConsumerImpl> spy = spy(consumer); + + consumer.pendingReceives.add(receiveFuture); + consumerConf.setReceiverQueueSize(0); + doReturn(message).when(spy).beforeConsume(any()); + spy.notifyPendingReceivedCallback(message, null); + Message<ConsumerImpl> receivedMessage = receiveFuture.join(); + + verify(spy, times(1)).beforeConsume(message); + Assert.assertTrue(receiveFuture.isDone()); + Assert.assertFalse(receiveFuture.isCompletedExceptionally()); + Assert.assertEquals(receivedMessage, message); + } + + @Test(invocationTimeOut = 1000) + public void testNotifyPendingReceivedCallback_WorkNormally() { + CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>(); + MessageImpl message = mock(MessageImpl.class); + ConsumerImpl<ConsumerImpl> spy = spy(consumer); + + consumer.pendingReceives.add(receiveFuture); + doReturn(message).when(spy).beforeConsume(any()); + doNothing().when(spy).messageProcessed(message); + spy.notifyPendingReceivedCallback(message, null); + Message<ConsumerImpl> receivedMessage = receiveFuture.join(); + + verify(spy, times(1)).beforeConsume(message); + verify(spy, times(1)).messageProcessed(message); + Assert.assertTrue(receiveFuture.isDone()); + Assert.assertFalse(receiveFuture.isCompletedExceptionally()); + Assert.assertEquals(receivedMessage, message); + } +}