This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 77c66b3911e261c40eab86b7890dede6b736559e Author: Aloys <[email protected]> AuthorDate: Tue Dec 28 19:06:43 2021 +0800 Fix reousrce leak when create producer failed (#13505) Fixes #13214 When client create producer failed caused by connection failed, topic terminated, or produce fenced. There are some resources that are not released in the client. When creating producer failed. 1. stop the sendTimout task 2. cancel the batchTimerTask 3. cancel the keyGeneratorTask 4. cancel the statTimeout task (cherry picked from commit 57eccf48e418e0243f586a945134785dbe9b5d08) --- .../broker/service/ExclusiveProducerTest.java | 28 +++++++++++++++ .../broker/service/TopicTerminationTest.java | 29 +++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 42 +++++++++++++--------- 3 files changed, 82 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 04da55d..4d2b16a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import io.netty.util.HashedWheelTimer; import lombok.Cleanup; import org.apache.pulsar.client.api.Producer; @@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.powermock.reflect.Whitebox; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase { } @Test(dataProvider = "topics") + public void testProducerTasksCleanupWhenUsingExclusiveProducers(String type, boolean partitioned) throws Exception { + String topic = newTopic(type, partitioned); + Producer<String> p1 = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.Exclusive) + .create(); + + try { + pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .accessMode(ProducerAccessMode.Exclusive) + .create(); + fail("Should have failed"); + } catch (ProducerFencedException e) { + // Expected + } + + p1.close(); + + HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0)); + } + + @Test(dataProvider = "topics") public void existingSharedProducer(String type, boolean partitioned) throws Exception { String topic = newTopic(type, partitioned); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java index a7a1111..8499524 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java @@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; + +import io.netty.util.HashedWheelTimer; import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -41,7 +43,10 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderListener; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -107,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase { } } + public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws Exception { + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + producer.send("msg-1".getBytes()); + producer.send("msg-2".getBytes()); + MessageId msgId3 = producer.send("msg-3".getBytes()); + + MessageId lastMessageId = admin.topics().terminateTopicAsync(topicName).get(); + assertEquals(lastMessageId, msgId3); + producer.close(); + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("Should have thrown exception"); + } catch (PulsarClientException.TopicTerminatedException e) { + // Expected + } + HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(timer.pendingTimeouts(), 0)); + } + @Test(timeOut = 20000) public void testTerminateWhilePublishing() throws Exception { Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index a5baba7..78aaf3c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -866,23 +866,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return CompletableFuture.completedFuture(null); } - Timeout timeout = sendTimeout; - if (timeout != null) { - timeout.cancel(); - sendTimeout = null; - } - - ScheduledFuture<?> batchTimerTask = this.batchTimerTask; - if (batchTimerTask != null) { - batchTimerTask.cancel(false); - this.batchTimerTask = null; - } - - if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) { - keyGeneratorTask.cancel(false); - } - - stats.cancelStatsTimeout(); + closeProducerTasks(); ClientCnx cnx = cnx(); if (cnx == null || currentState != State.Ready) { @@ -1544,6 +1528,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne failPendingMessages(cnx(), (PulsarClientException) cause); } producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); client.cleanupProducer(this); } else if (cause instanceof PulsarClientException.ProducerFencedException) { setState(State.ProducerFenced); @@ -1551,6 +1536,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne failPendingMessages(cnx(), (PulsarClientException) cause); } producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); client.cleanupProducer(this); } else if (producerCreatedFuture.isDone() || // (cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) @@ -1561,6 +1547,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { setState(State.Failed); producerCreatedFuture.completeExceptionally(cause); + closeProducerTasks(); client.cleanupProducer(this); Timeout timeout = sendTimeout; if (timeout != null) { @@ -1583,11 +1570,32 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { log.info("[{}] Producer creation failed for producer {} after producerTimeout", topic, producerId); } + closeProducerTasks(); setState(State.Failed); client.cleanupProducer(this); } } + private void closeProducerTasks() { + Timeout timeout = sendTimeout; + if (timeout != null) { + timeout.cancel(); + sendTimeout = null; + } + + ScheduledFuture<?> batchTimerTask = this.batchTimerTask; + if (batchTimerTask != null) { + batchTimerTask.cancel(false); + this.batchTimerTask = null; + } + + if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) { + keyGeneratorTask.cancel(false); + } + + stats.cancelStatsTimeout(); + } + private void resendMessages(ClientCnx cnx, long expectedEpoch) { cnx.ctx().channel().eventLoop().execute(() -> { synchronized (this) {
