This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 8eebe597705 [fix][client] Release the orphan producers after the
primary consumer is closed (#19858)
8eebe597705 is described below
commit 8eebe597705e744a93133910312abd4f6d721d6d
Author: fengyubiao <[email protected]>
AuthorDate: Thu Apr 6 12:59:34 2023 +0800
[fix][client] Release the orphan producers after the primary consumer is
closed (#19858)
Motivation: The producers ["retryLetterProducer", "deadLetterProducer"]
will be auto-created by consumers if enabled `DLQ`, but these producers will
not close after consumers are closed.
Modifications: Auto close "retryLetterProducer" and "deadLetterProducer"
after the primary consumer is closed
(cherry picked from commit 94ae340f94e004fb1de76a2d05e1e7160bb8eff1)
---
.../apache/pulsar/client/api/RetryTopicTest.java | 48 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++++-
2 files changed, 65 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index d8a97b7bdb1..56dfc604b56 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -516,4 +517,51 @@ public class RetryTopicTest extends ProducerConsumerBase {
consumer.close();
}
+
+ @Test(timeOut = 30000L)
+ public void testRetryProducerWillCloseByConsumer() throws Exception {
+ final String topicName = "persistent://my-property/my-ns/tp_" +
UUID.randomUUID().toString();
+ final String subscriptionName = "sub1";
+ final String topicRetry = topicName + "-" + subscriptionName +
"-RETRY";
+ final String topicDLQ = topicName + "-" + subscriptionName + "-DLQ";
+
+ // Trigger the DLQ and retry topic creation.
+ final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .enableRetry(true)
+
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic(topicDLQ).maxRedeliverCount(2).build())
+ .receiverQueueSize(100)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ final Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .create();
+ // send messages.
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage()
+ .value("msg-" + i)
+ .sendAsync();
+ }
+ producer.flush();
+ for (int i = 0; i < 20; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ if (msg != null) {
+ consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
+ } else {
+ break;
+ }
+ }
+
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, false);
+
+ // Verify: "retryLetterProducer" and "deadLetterProducer" will be
closed by "consumer.close()", so these two
+ // topics can be deleted successfully.
+ admin.topics().delete(topicRetry, false);
+ admin.topics().delete(topicDLQ, false);
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c7af6d3589c..101d42a6694 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,7 +1085,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
});
}
- return closeFuture;
+ ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
+ closeFutures.add(closeFuture);
+ if (retryLetterProducer != null) {
+
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("Exception ignored in closing retryLetterProducer
of consumer", ex);
+ }
+ }));
+ }
+ if (deadLetterProducer != null) {
+ closeFutures.add(deadLetterProducer.thenCompose(p ->
p.closeAsync()).whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("Exception ignored in closing deadLetterProducer
of consumer", ex);
+ }
+ }));
+ }
+ return FutureUtil.waitForAll(closeFutures);
}
private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable
exception) {