This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56839d422287f63bcc3492538cf623a0f6fb7245
Author: Addison Higham <[email protected]>
AuthorDate: Fri Feb 12 13:56:32 2021 -0700

    [client] Cleanup consumer on multitopic subscribe failure (#9419)
    
    Currently, when a multi-topic subscribe fails (via a set of topics or a
    regex) we can leave consumers connected, as the multitopic consumer
    doesn't close any of the topics.
    
    This means we rely on the client to call closeAsync, otherwise, the
    consumer is left in partially open state.
    
    This fix changes that, and ensures we call close in the case of an
    exception
    
    Co-authored-by: Sijie Guo <[email protected]>
    (cherry picked from commit cbe9816423a23686bdd2091192f5aa5f8bda152d)
---
 .../client/impl/MultiTopicsConsumerImpl.java       |  9 ++++--
 .../pulsar/client/impl/ClientTestFixtures.java     |  6 ++++
 .../client/impl/MultiTopicsConsumerImplTest.java   | 35 ++++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 55ee806..8efab22 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -189,8 +189,13 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                 subscribeFuture().complete(MultiTopicsConsumerImpl.this);
             })
             .exceptionally(ex -> {
-                log.warn("[{}] Failed to subscribe topics: {}", topic, 
ex.getMessage());
-                subscribeFuture.completeExceptionally(ex);
+                log.warn("[{}] Failed to subscribe topics: {}, closing 
consumer", topic, ex.getMessage());
+                closeAsync().whenComplete((res, closeEx) -> {
+                    if (closeEx != null) {
+                        log.error("[{}] Failed to unsubscribe after failed 
consumer creation: {}", topic, closeEx.getMessage());
+                    }
+                    subscribeFuture.completeExceptionally(ex);
+                });
                 return null;
             });
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index a1653c1..085a4a9 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -72,6 +72,12 @@ class ClientTestFixtures {
         return future;
     }
 
+    static <T> CompletableFuture<T> createExceptionFuture(Throwable ex, int 
delayMillis) {
+        CompletableFuture<T> future = new CompletableFuture<>();
+        SCHEDULER.schedule(() -> future.completeExceptionally(ex), 
delayMillis, TimeUnit.MILLISECONDS);
+        return future;
+    }
+
     public static ExecutorService createMockedExecutor() {
         return mock(ExecutorService.class);
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 767cb65..664e78a 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import com.google.common.collect.Sets;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -31,13 +32,19 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.testng.annotations.Test;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
 
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createDelayedCompletedFuture;
+import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createExceptionFuture;
 import static 
org.apache.pulsar.client.impl.ClientTestFixtures.createPulsarClientMockWithMockedClientCnx;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.*;
@@ -137,4 +144,32 @@ public class MultiTopicsConsumerImplTest {
         assertFalse(consumer.hasPendingBatchReceive());
     }
 
+    @Test
+    public void testConsumerCleanupOnSubscribeFailure() throws 
InterruptedException, TimeoutException, ExecutionException {
+        ExecutorService listenerExecutor = mock(ExecutorService.class);
+        ConsumerConfigurationData<byte[]> consumerConfData = new 
ConsumerConfigurationData<>();
+        consumerConfData.setSubscriptionName("subscriptionName");
+        consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", 
"c")));
+        int completionDelayMillis = 10;
+        Schema<byte[]> schema = Schema.BYTES;
+        PulsarClientImpl clientMock = 
createPulsarClientMockWithMockedClientCnx();
+        
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> 
createExceptionFuture(
+                new PulsarClientException.InvalidConfigurationException("a 
mock exception"), completionDelayMillis));
+        CompletableFuture<Consumer<byte[]>> completeFuture = new 
CompletableFuture<>();
+        MultiTopicsConsumerImpl<byte[]> impl = new 
MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, listenerExecutor,
+                completeFuture, schema, null, true);
+        // assert that we don't start in closed, then we move to closed and 
get an exception
+        // indicating that closeAsync was called
+        assertEquals(impl.getState(), HandlerState.State.Uninitialized);
+        try {
+            completeFuture.get(15, TimeUnit.MILLISECONDS);
+        } catch (Throwable ex) {
+            // just ignore the exception
+        }
+        assertTrue(completeFuture.isCompletedExceptionally());
+        assertEquals(impl.getConsumers().size(), 0);
+        assertEquals(impl.getState(), HandlerState.State.Closed);
+        verify(clientMock, times(1)).cleanupConsumer(any());
+    }
+
 }

Reply via email to