This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 57eccf4 Fix reousrce leak when create producer failed (#13505)
57eccf4 is described below
commit 57eccf48e418e0243f586a945134785dbe9b5d08
Author: Aloys <[email protected]>
AuthorDate: Tue Dec 28 19:06:43 2021 +0800
Fix reousrce leak when create producer failed (#13505)
Fixes #13214
### Motivation
When client create producer failed caused by connection failed, topic
terminated, or produce fenced. There are some resources that are not released
in the client.
### Modifications
When creating producer failed.
1. stop the sendTimout task
2. cancel the batchTimerTask
3. cancel the keyGeneratorTask
4. cancel the statTimeout task
---
.../broker/service/ExclusiveProducerTest.java | 28 +++++++++++++++
.../broker/service/TopicTerminationTest.java | 28 +++++++++++++++
.../pulsar/client/impl/TopicDoesNotExistsTest.java | 1 +
.../apache/pulsar/client/impl/ProducerImpl.java | 42 +++++++++++++---------
4 files changed, 82 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 04da55d..4d2b16a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Producer;
@@ -33,8 +34,11 @@ import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import
org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -118,6 +122,30 @@ public class ExclusiveProducerTest extends BrokerTestBase {
}
@Test(dataProvider = "topics")
+ public void testProducerTasksCleanupWhenUsingExclusiveProducers(String
type, boolean partitioned) throws Exception {
+ String topic = newTopic(type, partitioned);
+ Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .create();
+
+ try {
+ pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .create();
+ fail("Should have failed");
+ } catch (ProducerFencedException e) {
+ // Expected
+ }
+
+ p1.close();
+
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(timer.pendingTimeouts(), 0));
+ }
+
+ @Test(dataProvider = "topics")
public void existingSharedProducer(String type, boolean partitioned)
throws Exception {
String topic = newTopic(type, partitioned);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index 0cd84f3..dedc990 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -31,6 +31,8 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+
+import io.netty.util.HashedWheelTimer;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -41,8 +43,10 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -108,6 +112,30 @@ public class TopicTerminationTest extends BrokerTestBase {
}
}
+ public void testCreatingProducerTasksCleanupWhenOnTerminatedTopic() throws
Exception {
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ producer.send("msg-1".getBytes());
+ producer.send("msg-2".getBytes());
+ MessageId msgId3 = producer.send("msg-3".getBytes());
+
+ MessageId lastMessageId =
admin.topics().terminateTopicAsync(topicName).get();
+ assertEquals(lastMessageId, msgId3);
+ producer.close();
+
+ try {
+ pulsarClient.newProducer().topic(topicName).create();
+ fail("Should have thrown exception");
+ } catch (PulsarClientException.TopicTerminatedException e) {
+ // Expected
+ }
+ HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(timer.pendingTimeouts(), 0));
+ }
+
@Test(timeOut = 20000)
public void testTerminateWhilePublishing() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index 44e1b39..1fb42d5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -66,6 +66,7 @@ public class TopicDoesNotExistsTest extends
ProducerConsumerBase {
}
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
+ Assert.assertEquals(timer.pendingTimeouts(), 0);
Assert.assertEquals(((PulsarClientImpl)
pulsarClient).producersCount(), 0);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1d44fcf..4e29690 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -892,23 +892,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return CompletableFuture.completedFuture(null);
}
- Timeout timeout = sendTimeout;
- if (timeout != null) {
- timeout.cancel();
- sendTimeout = null;
- }
-
- ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
- if (batchTimerTask != null) {
- batchTimerTask.cancel(false);
- this.batchTimerTask = null;
- }
-
- if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
- keyGeneratorTask.cancel(false);
- }
-
- stats.cancelStatsTimeout();
+ closeProducerTasks();
ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
@@ -1575,6 +1559,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
failPendingMessages(cnx(), (PulsarClientException)
cause);
}
producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
client.cleanupProducer(this);
} else if (cause instanceof
PulsarClientException.ProducerFencedException) {
setState(State.ProducerFenced);
@@ -1582,6 +1567,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
failPendingMessages(cnx(), (PulsarClientException)
cause);
}
producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
client.cleanupProducer(this);
} else if (producerCreatedFuture.isDone() || //
(cause instanceof PulsarClientException &&
PulsarClientException.isRetriableError(cause)
@@ -1592,6 +1578,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
} else {
setState(State.Failed);
producerCreatedFuture.completeExceptionally(cause);
+ closeProducerTasks();
client.cleanupProducer(this);
Timeout timeout = sendTimeout;
if (timeout != null) {
@@ -1616,6 +1603,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
} else {
log.info("[{}] Producer creation failed for producer {}
after producerTimeout", topic, producerId);
}
+ closeProducerTasks();
setState(State.Failed);
client.cleanupProducer(this);
}
@@ -1624,6 +1612,26 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
}
+ private void closeProducerTasks() {
+ Timeout timeout = sendTimeout;
+ if (timeout != null) {
+ timeout.cancel();
+ sendTimeout = null;
+ }
+
+ ScheduledFuture<?> batchTimerTask = this.batchTimerTask;
+ if (batchTimerTask != null) {
+ batchTimerTask.cancel(false);
+ this.batchTimerTask = null;
+ }
+
+ if (keyGeneratorTask != null && !keyGeneratorTask.isCancelled()) {
+ keyGeneratorTask.cancel(false);
+ }
+
+ stats.cancelStatsTimeout();
+ }
+
private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (this) {