This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 56839d422287f63bcc3492538cf623a0f6fb7245 Author: Addison Higham <[email protected]> AuthorDate: Fri Feb 12 13:56:32 2021 -0700 [client] Cleanup consumer on multitopic subscribe failure (#9419) Currently, when a multi-topic subscribe fails (via a set of topics or a regex) we can leave consumers connected, as the multitopic consumer doesn't close any of the topics. This means we rely on the client to call closeAsync, otherwise, the consumer is left in partially open state. This fix changes that, and ensures we call close in the case of an exception Co-authored-by: Sijie Guo <[email protected]> (cherry picked from commit cbe9816423a23686bdd2091192f5aa5f8bda152d) --- .../client/impl/MultiTopicsConsumerImpl.java | 9 ++++-- .../pulsar/client/impl/ClientTestFixtures.java | 6 ++++ .../client/impl/MultiTopicsConsumerImplTest.java | 35 ++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 55ee806..8efab22 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -189,8 +189,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { subscribeFuture().complete(MultiTopicsConsumerImpl.this); }) .exceptionally(ex -> { - log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage()); - subscribeFuture.completeExceptionally(ex); + log.warn("[{}] Failed to subscribe topics: {}, closing consumer", topic, ex.getMessage()); + closeAsync().whenComplete((res, closeEx) -> { + if (closeEx != null) { + log.error("[{}] Failed to unsubscribe after failed consumer creation: {}", topic, closeEx.getMessage()); + } + subscribeFuture.completeExceptionally(ex); + }); return null; }); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java index a1653c1..085a4a9 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java @@ -72,6 +72,12 @@ class ClientTestFixtures { return future; } + static <T> CompletableFuture<T> createExceptionFuture(Throwable ex, int delayMillis) { + CompletableFuture<T> future = new CompletableFuture<>(); + SCHEDULER.schedule(() -> future.completeExceptionally(ex), delayMillis, TimeUnit.MILLISECONDS); + return future; + } + public static ExecutorService createMockedExecutor() { return mock(ExecutorService.class); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 767cb65..664e78a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import com.google.common.collect.Sets; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClientException; @@ -31,13 +32,19 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.HashSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Pattern; import static org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture; +import static org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture; import static org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -137,4 +144,32 @@ public class MultiTopicsConsumerImplTest { assertFalse(consumer.hasPendingBatchReceive()); } + @Test + public void testConsumerCleanupOnSubscribeFailure() throws InterruptedException, TimeoutException, ExecutionException { + ExecutorService listenerExecutor = mock(ExecutorService.class); + ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>(); + consumerConfData.setSubscriptionName("subscriptionName"); + consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c"))); + int completionDelayMillis = 10; + Schema<byte[]> schema = Schema.BYTES; + PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(); + when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture( + new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis)); + CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>(); + MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor, + completeFuture, schema, null, true); + // assert that we don't start in closed, then we move to closed and get an exception + // indicating that closeAsync was called + assertEquals(impl.getState(), HandlerState.State.Uninitialized); + try { + completeFuture.get(15, TimeUnit.MILLISECONDS); + } catch (Throwable ex) { + // just ignore the exception + } + assertTrue(completeFuture.isCompletedExceptionally()); + assertEquals(impl.getConsumers().size(), 0); + assertEquals(impl.getState(), HandlerState.State.Closed); + verify(clientMock, times(1)).cleanupConsumer(any()); + } + }
