This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 57eccf4  Fix reousrce leak when create producer failed (#13505)
57eccf4 is described below

commit 57eccf48e418e0243f586a945134785dbe9b5d08
Author: Aloys <[email protected]>
AuthorDate: Tue Dec 28 19:06:43 2021 +0800

    Fix reousrce leak when create producer failed (#13505)
    
    Fixes #13214
    
    ### Motivation
    
    When client create producer failed caused by connection failed, topic 
terminated, or produce fenced. There are some resources that are not released 
in the client.
    
    ### Modifications
    
    When creating producer failed.
    1. stop the sendTimout  task
    2. cancel the batchTimerTask
    3. cancel the keyGeneratorTask
    4. cancel the statTimeout task
---
 .../broker/service/ExclusiveProducerTest.java      | 28 +++++++++++++++
 .../broker/service/TopicTerminationTest.java       | 28 +++++++++++++++
 .../pulsar/client/impl/TopicDoesNotExistsTest.java |  1 +
 .../apache/pulsar/client/impl/ProducerImpl.java    | 42 +++++++++++++---------
 4 files changed, 82 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 04da55d..4d2b16a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 
 import org.apache.pulsar.client.api.Producer;
@@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
 import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase {
     }
 
     @Test(dataProvider = "topics")
+    public void testProducerTasksCleanupWhenUsingExclusiveProducers(String 
type, boolean partitioned) throws Exception {
+        String topic = newTopic(type, partitioned);
+        Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .create();
+
+        try {
+            pulsarClient.newProducer(Schema.STRING)
+                    .topic(topic)
+                    .accessMode(ProducerAccessMode.Exclusive)
+                    .create();
+            fail("Should have failed");
+        } catch (ProducerFencedException e) {
+            // Expected
+        }
+
+        p1.close();
+
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(timer.pendingTimeouts(), 0));
+    }
+
+    @Test(dataProvider = "topics")
     public void existingSharedProducer(String type, boolean partitioned) 
throws Exception {
         String topic = newTopic(type, partitioned);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 0cd84f3..dedc990 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+
+import io.netty.util.HashedWheelTimer;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -41,8 +43,10 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -108,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase {
         }
     }
 
+    public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws 
Exception {
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        producer.send("msg-1".getBytes());
+        producer.send("msg-2".getBytes());
+        MessageId msgId3 = producer.send("msg-3".getBytes());
+
+        MessageId lastMessageId = 
admin.topics().terminateTopicAsync(topicName).get();
+        assertEquals(lastMessageId, msgId3);
+        producer.close();
+
+        try {
+            pulsarClient.newProducer().topic(topicName).create();
+            fail("Should have thrown exception");
+        } catch (PulsarClientException.TopicTerminatedException e) {
+            // Expected
+        }
+        HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(timer.pendingTimeouts(), 0));
+    }
+
     @Test(timeOut = 20000)
     public void testTerminateWhilePublishing() throws Exception {
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index 44e1b39..1fb42d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -66,6 +66,7 @@ public class TopicDoesNotExistsTest extends 
ProducerConsumerBase {
         }
         Thread.sleep(2000);
         HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) 
pulsarClient).timer();
+        Assert.assertEquals(timer.pendingTimeouts(), 0);
         Assert.assertEquals(((PulsarClientImpl) 
pulsarClient).producersCount(), 0);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1d44fcf..4e29690 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -892,23 +892,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return CompletableFuture.completedFuture(null);
         }
 
-        Timeout timeout = sendTimeout;
-        if (timeout != null) {
-            timeout.cancel();
-            sendTimeout = null;
-        }
-
-        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
-        if (batchTimerTask != null) {
-            batchTimerTask.cancel(false);
-            this.batchTimerTask = null;
-        }
-
-        if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
-            keyGeneratorTask.cancel(false);
-        }
-
-        stats.cancelStatsTimeout();
+        closeProducerTasks();
 
         ClientCnx cnx = cnx();
         if (cnx == null || currentState != State.Ready) {
@@ -1575,6 +1559,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             failPendingMessages(cnx(), (PulsarClientException) 
cause);
                         }
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                     } else if (cause instanceof 
PulsarClientException.ProducerFencedException) {
                         setState(State.ProducerFenced);
@@ -1582,6 +1567,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             failPendingMessages(cnx(), (PulsarClientException) 
cause);
                         }
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                     } else if (producerCreatedFuture.isDone() || //
                                (cause instanceof PulsarClientException && 
PulsarClientException.isRetriableError(cause)
@@ -1592,6 +1578,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                     } else {
                         setState(State.Failed);
                         producerCreatedFuture.completeExceptionally(cause);
+                        closeProducerTasks();
                         client.cleanupProducer(this);
                         Timeout timeout = sendTimeout;
                         if (timeout != null) {
@@ -1616,6 +1603,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 } else {
                     log.info("[{}] Producer creation failed for producer {} 
after producerTimeout", topic, producerId);
                 }
+                closeProducerTasks();
                 setState(State.Failed);
                 client.cleanupProducer(this);
             }
@@ -1624,6 +1612,26 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
     }
 
+    private void closeProducerTasks() {
+        Timeout timeout = sendTimeout;
+        if (timeout != null) {
+            timeout.cancel();
+            sendTimeout = null;
+        }
+
+        ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+        if (batchTimerTask != null) {
+            batchTimerTask.cancel(false);
+            this.batchTimerTask = null;
+        }
+
+        if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
+            keyGeneratorTask.cancel(false);
+        }
+
+        stats.cancelStatsTimeout();
+    }
+
     private void resendMessages(ClientCnx cnx, long expectedEpoch) {
         cnx.ctx().channel().eventLoop().execute(() -> {
             synchronized (this) {

Reply via email to