This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cbe9816 [client] Cleanup consumer on multitopic subscribe failure
(#9419)
cbe9816 is described below
commit cbe9816423a23686bdd2091192f5aa5f8bda152d
Author: Addison Higham <[email protected]>
AuthorDate: Fri Feb 12 13:56:32 2021 -0700
[client] Cleanup consumer on multitopic subscribe failure (#9419)
Currently, when a multi-topic subscribe fails (via a set of topics or a
regex) we can leave consumers connected, as the multitopic consumer
doesn't close any of the topics.
This means we rely on the client to call closeAsync, otherwise, the
consumer is left in partially open state.
This fix changes that, and ensures we call close in the case of an
exception
Co-authored-by: Sijie Guo <[email protected]>
---
.../client/impl/MultiTopicsConsumerImpl.java | 9 ++++--
.../pulsar/client/impl/ClientTestFixtures.java | 6 ++++
.../client/impl/MultiTopicsConsumerImplTest.java | 35 ++++++++++++++++++++++
3 files changed, 48 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6cf6d3d..2c81d0a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -190,8 +190,13 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
})
.exceptionally(ex -> {
- log.warn("[{}] Failed to subscribe topics: {}", topic,
ex.getMessage());
- subscribeFuture.completeExceptionally(ex);
+ log.warn("[{}] Failed to subscribe topics: {}, closing
consumer", topic, ex.getMessage());
+ closeAsync().whenComplete((res, closeEx) -> {
+ if (closeEx != null) {
+ log.error("[{}] Failed to unsubscribe after failed
consumer creation: {}", topic, closeEx.getMessage());
+ }
+ subscribeFuture.completeExceptionally(ex);
+ });
return null;
});
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index a1653c1..085a4a9 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -72,6 +72,12 @@ class ClientTestFixtures {
return future;
}
+ static <T> CompletableFuture<T> createExceptionFuture(Throwable ex, int
delayMillis) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ SCHEDULER.schedule(() -> future.completeExceptionally(ex),
delayMillis, TimeUnit.MILLISECONDS);
+ return future;
+ }
+
public static ExecutorService createMockedExecutor() {
return mock(ExecutorService.class);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 767cb65..664e78a 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import com.google.common.collect.Sets;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -31,13 +32,19 @@ import
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
import static
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
import static
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@@ -137,4 +144,32 @@ public class MultiTopicsConsumerImplTest {
assertFalse(consumer.hasPendingBatchReceive());
}
+ @Test
+ public void testConsumerCleanupOnSubscribeFailure() throws
InterruptedException, TimeoutException, ExecutionException {
+ ExecutorService listenerExecutor = mock(ExecutorService.class);
+ ConsumerConfigurationData<byte[]> consumerConfData = new
ConsumerConfigurationData<>();
+ consumerConfData.setSubscriptionName("subscriptionName");
+ consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b",
"c")));
+ int completionDelayMillis = 10;
+ Schema<byte[]> schema = Schema.BYTES;
+ PulsarClientImpl clientMock =
createPulsarClientMockWithMockedClientCnx();
+
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation ->
createExceptionFuture(
+ new PulsarClientException.InvalidConfigurationException("a
mock exception"), completionDelayMillis));
+ CompletableFuture<Consumer<byte[]>> completeFuture = new
CompletableFuture<>();
+ MultiTopicsConsumerImpl<byte[]> impl = new
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+ completeFuture, schema, null, true);
+ // assert that we don't start in closed, then we move to closed and
get an exception
+ // indicating that closeAsync was called
+ assertEquals(impl.getState(), HandlerState.State.Uninitialized);
+ try {
+ completeFuture.get(15, TimeUnit.MILLISECONDS);
+ } catch (Throwable ex) {
+ // just ignore the exception
+ }
+ assertTrue(completeFuture.isCompletedExceptionally());
+ assertEquals(impl.getConsumers().size(), 0);
+ assertEquals(impl.getState(), HandlerState.State.Closed);
+ verify(clientMock, times(1)).cleanupConsumer(any());
+ }
+
}