This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fc3ab2eaca4b7154b61b18aa360efc8d99a06999 Author: Aloys <[email protected]> AuthorDate: Tue Dec 28 19:06:43 2021 +0800 Fix reousrce leak when create producer failed (#13505) Fixes #13214 ### Motivation When client create producer failed caused by connection failed, topic terminated, or produce fenced. There are some resources that are not released in the client. ### Modifications When creating producer failed. 1. stop the sendTimout task 2. cancel the batchTimerTask 3. cancel the keyGeneratorTask 4. cancel the statTimeout task (cherry picked from commit 57eccf48e418e0243f586a945134785dbe9b5d08) --- .../broker/service/ExclusiveProducerTest.java | 28 +++++++++++++++ .../broker/service/TopicTerminationTest.java | 28 +++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 42 +++++++++++++--------- 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 04da55d..4d2b16a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import io.netty.util.HashedWheelTimer; import lombok.Cleanup; import org.apache.pulsar.client.api.Producer; @@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.powermock.reflect.Whitebox; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase { } @Test(dataProvider = "topics") + public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boolean partitioned) throws Exception { + String topic = newTopic(type, partitioned); + Producer<String> p1 = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.Exclusive) + .create(); + + try { + pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.Exclusive) + .create(); + fail("Should have failed"); + } catch (ProducerFencedException e) { + // Expected + } + + p1.close(); + + HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0)); + } + + @Test(dataProvider = "topics") public void existingSharedProducer(String type, boolean partitioned) throws Exception { String topic = newTopic(type, partitioned); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java index 0cd84f3..dedc990 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java @@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; + +import io.netty.util.HashedWheelTimer; import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -41,8 +43,10 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -108,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase { } } + public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws Exception { + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + producer.send("msg-1".getBytes()); + producer.send("msg-2".getBytes()); + MessageId msgId3 = producer.send("msg-3".getBytes()); + + MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get(); + assertEquals(lastMessageId, msgId3); + producer.close(); + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("Should have thrown exception"); + } catch (PulsarClientException.TopicTerminatedException e) { + // Expected + } + HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0)); + } + @Test(timeOut = 20000) public void testTerminateWhilePublishing() throws Exception { Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 5c22f13..31a32da 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -872,23 +872,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return CompletableFuture.completedFuture(null); } - Timeout timeout = sendTimeout; - if (timeout != null) { - timeout.cancel(); - sendTimeout = null; - } - - ScheduledFuture<?> batchTimerTask = this.batchTimerTask; - if (batchTimerTask != null) { - batchTimerTask.cancel(false); - this.batchTimerTask = null; - } - - if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) { - keyGeneratorTask.cancel(false); - } - - stats.cancelStatsTimeout(); + closeProducerTasks(); ClientCnx cnx = cnx(); if (cnx == null || currentState != State.Ready) { @@ -1558,6 +1542,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne failPendingMessages(cnx(), (PulsarClientException) cause); } producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); client.cleanupProducer(this); } else if (cause instanceof PulsarClientException.ProducerFencedException) { setState(State.ProducerFenced); @@ -1565,6 +1550,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne failPendingMessages(cnx(), (PulsarClientException) cause); } producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || // (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) @@ -1575,6 +1561,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { setState(State.Failed); producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); client.cleanupProducer(this); Timeout timeout = sendTimeout; if (timeout != null) { @@ -1599,6 +1586,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId); } + closeProducerTasks(); setState(State.Failed); client.cleanupProducer(this); } @@ -1607,6 +1595,26 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } } + private void closeProducerTasks() { + Timeout timeout = sendTimeout; + if (timeout != null) { + timeout.cancel(); + sendTimeout = null; + } + + ScheduledFuture<?> batchTimerTask = this.batchTimerTask; + if (batchTimerTask != null) { + batchTimerTask.cancel(false); + this.batchTimerTask = null; + } + + if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) { + keyGeneratorTask.cancel(false); + } + + stats.cancelStatsTimeout(); + } + private void resendMessages(ClientCnx cnx, long expectedEpoch) { cnx.ctx().channel().eventLoop().execute(() -> { synchronized (this) {
