This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 14c580af58e [fix][client] Fix consumer leak when thread is interrupted 
before subscribe completes (#24100)
14c580af58e is described below

commit 14c580af58ec187a68c4206e5652e3d61496d1c1
Author: Baodi Shi <[email protected]>
AuthorDate: Fri Mar 21 15:41:44 2025 +0800

    [fix][client] Fix consumer leak when thread is interrupted before subscribe 
completes (#24100)
    
    (cherry picked from commit e51a639ba86b514202e88bfb8017c16b3d1a87e9)
---
 .../pulsar/client/impl/ConsumerCloseTest.java      | 91 ++++++++++++++++++++++
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 28 ++++++-
 2 files changed, 118 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
new file mode 100644
index 00000000000..3a446101990
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class ConsumerCloseTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testInterruptedWhenCreateConsumer() throws 
InterruptedException {
+
+        String tpName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        String subName = "test-sub";
+        String mlCursorPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + 
TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + subName;
+
+        // Make create cursor delay 1s
+        CountDownLatch topicLoadLatch = new CountDownLatch(1);
+        for (int i = 0; i < 5; i++) {
+            mockZooKeeper.delay(1000, (op, path) -> {
+                if (mlCursorPath.equals(path)) {
+                    topicLoadLatch.countDown();
+                    return true;
+                }
+                return false;
+            });
+        }
+
+        Thread startConsumer = new Thread(() -> {
+            try {
+                pulsarClient.newConsumer()
+                        .topic(tpName)
+                        .subscriptionName(subName)
+                        .subscribe();
+                Assert.fail("Should have thrown an exception");
+            } catch (PulsarClientException e) {
+                Assert.assertTrue(e.getCause() instanceof 
InterruptedException);
+            }
+        });
+        startConsumer.start();
+        topicLoadLatch.await();
+        startConsumer.interrupt();
+
+        PulsarClientImpl clientImpl = (PulsarClientImpl) pulsarClient;
+        Awaitility.await().ignoreExceptions().atMost(10, 
TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(clientImpl.consumersCount(), 0);
+        });
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index cc3b430db8e..bd62c440342 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -31,6 +31,7 @@ import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
@@ -61,6 +62,7 @@ import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 @Getter(AccessLevel.PUBLIC)
 public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 
@@ -68,6 +70,7 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
     private ConsumerConfigurationData<T> conf;
     private final Schema<T> schema;
     private List<ConsumerInterceptor<T>> interceptorList;
+    private volatile boolean interruptedBeforeConsumerCreation;
 
     private static final long MIN_ACK_TIMEOUT_MILLIS = 1000;
     private static final long MIN_TICK_TIME_MILLIS = 100;
@@ -97,8 +100,31 @@ public class ConsumerBuilderImpl<T> implements 
ConsumerBuilder<T> {
 
     @Override
     public Consumer<T> subscribe() throws PulsarClientException {
+        CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
         try {
-            return subscribeAsync().get();
+            subscribeAsync().whenComplete((c, e) -> {
+                if (e != null) {
+                    // If the subscription fails, there is no need to close 
the consumer here,
+                    // as it will be handled in the subscribeAsync method.
+                    future.completeExceptionally(e);
+                    return;
+                }
+                if (interruptedBeforeConsumerCreation) {
+                    c.closeAsync().exceptionally(closeEx -> {
+                        log.error("Failed to close consumer after 
interruption", closeEx.getCause());
+                        return null;
+                    });
+                    future.completeExceptionally(new PulsarClientException(
+                            "Subscription was interrupted before the consumer 
could be fully created"));
+                } else {
+                    future.complete(c);
+                }
+            });
+            return future.get();
+        } catch (InterruptedException e) {
+            interruptedBeforeConsumerCreation = true;
+            Thread.currentThread().interrupt();
+            throw PulsarClientException.unwrap(e);
         } catch (Exception e) {
             throw PulsarClientException.unwrap(e);
         }

Reply via email to