This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77c66b3911e261c40eab86b7890dede6b736559e
Author: Aloys <[email protected]>
AuthorDate: Tue Dec 28 19:06:43 2021 +0800

    Fix reousrce leak when create producer failed (#13505)
    
    Fixes #13214
    
    When client create producer failed caused by connection failed, topic 
terminated, or produce fenced. There are some resources that are not released 
in the client.
    
    When creating producer failed.
    1. stop the sendTimout  task
    2. cancel the batchTimerTask
    3. cancel the keyGeneratorTask
    4. cancel the statTimeout task
    
    (cherry picked from commit 57eccf48e418e0243f586a945134785dbe9b5d08)
---
 .../broker/service/ExclusiveProducerTest.java      | 28 +++++++++++++++
 .../broker/service/TopicTerminationTest.java       | 29 +++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 42 +++++++++++++---------
 3 files changed, 82 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 04da55d..4d2b16a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 
 import org.apache.pulsar.client.api.Producer;
@@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
 import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase {
     }
 
     @Test(dataProvider = "topics")
+    public void testProducerTasksCleanupWhenUsingExclusiveProducers(String 
type, boolean partitioned) throws Exception {
+        String topic = newTopic(type, partitioned);
+        Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .create();
+
+        try {
+            pulsarClient.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .accessMode(ProducerAccessMode.Exclusive)
+                    .create();
+            fail("Should have failed");
+        } catch (ProducerFencedException e) {
+            // Expected
+        }
+
+        p1.close();
+
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(timer.pendingTimeouts(), 0));
+    }
+
+    @Test(dataProvider = "topics")
     public void existingSharedProducer(String type, boolean partitioned) 
throws Exception {
         String topic = newTopic(type, partitioned);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index a7a1111..8499524 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+
+import io.netty.util.HashedWheelTimer;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -41,7 +43,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -107,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase {
         }
     }
 
+    public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws 
Exception {
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        producer.send("msg-1".getBytes());
+        producer.send("msg-2".getBytes());
+        MessageId msgId3 = producer.send("msg-3".getBytes());
+
+        MessageId lastMessageId = 
admin.topics().terminateTopicAsync(topicName).get();
+        assertEquals(lastMessageId, msgId3);
+        producer.close();
+
+        try {
+            pulsarClient.newProducer().topic(topicName).create();
+            fail("Should have thrown exception");
+        } catch (PulsarClientException.TopicTerminatedException e) {
+            // Expected
+        }
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(timer.pendingTimeouts(), 0));
+    }
+
     @Test(timeOut = 20000)
     public void testTerminateWhilePublishing() throws Exception {
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a5baba7..78aaf3c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -866,23 +866,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return CompletableFuture.completedFuture(null);
         }
 
-        Timeout timeout = sendTimeout;
-        if (timeout != null) {
-            timeout.cancel();
-            sendTimeout = null;
-        }
-
-        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
-        if (batchTimerTask != null) {
-            batchTimerTask.cancel(false);
-            this.batchTimerTask = null;
-        }
-
-        if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
-            keyGeneratorTask.cancel(false);
-        }
-
-        stats.cancelStatsTimeout();
+        closeProducerTasks();
 
         ClientCnx cnx = cnx();
         if (cnx == null || currentState != State.Ready) {
@@ -1544,6 +1528,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             failPendingMessages(cnx(), (PulsarClientException) 
cause);
                         }
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                     } else if (cause instanceof 
PulsarClientException.ProducerFencedException) {
                         setState(State.ProducerFenced);
@@ -1551,6 +1536,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             failPendingMessages(cnx(), (PulsarClientException) 
cause);
                         }
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
                     (cause instanceof PulsarClientException && 
PulsarClientException.isRetriableError(cause)
@@ -1561,6 +1547,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     } else {
                         setState(State.Failed);
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                         Timeout timeout = sendTimeout;
                         if (timeout != null) {
@@ -1583,11 +1570,32 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             } else {
                 log.info("[{}] Producer creation failed for producer {} after 
producerTimeout", topic, producerId);
             }
+            closeProducerTasks();
             setState(State.Failed);
             client.cleanupProducer(this);
         }
     }
 
+    private void closeProducerTasks() {
+        Timeout timeout = sendTimeout;
+        if (timeout != null) {
+            timeout.cancel();
+            sendTimeout = null;
+        }
+
+        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+        if (batchTimerTask != null) {
+            batchTimerTask.cancel(false);
+            this.batchTimerTask = null;
+        }
+
+        if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
+            keyGeneratorTask.cancel(false);
+        }
+
+        stats.cancelStatsTimeout();
+    }
+
     private void resendMessages(ClientCnx cnx, long expectedEpoch) {
         cnx.ctx().channel().eventLoop().execute(() -> {
             synchronized (this) {

Reply via email to