This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8eebe597705 [fix][client] Release the orphan producers after the 
primary consumer is closed (#19858)
8eebe597705 is described below

commit 8eebe597705e744a93133910312abd4f6d721d6d
Author: fengyubiao <[email protected]>
AuthorDate: Thu Apr 6 12:59:34 2023 +0800

    [fix][client] Release the orphan producers after the primary consumer is 
closed (#19858)
    
    Motivation: The producers ["retryLetterProducer", "deadLetterProducer"] 
will be auto-created by consumers if enabled `DLQ`, but these producers will 
not close after consumers are closed.
    
    Modifications: Auto close "retryLetterProducer" and "deadLetterProducer" 
after the primary consumer is closed
    (cherry picked from commit 94ae340f94e004fb1de76a2d05e1e7160bb8eff1)
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 48 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 18 +++++++-
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index d8a97b7bdb1..56dfc604b56 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -516,4 +517,51 @@ public class RetryTopicTest extends ProducerConsumerBase {
         consumer.close();
     }
 
+
+    @Test(timeOut = 30000L)
+    public void testRetryProducerWillCloseByConsumer() throws Exception {
+        final String topicName = "persistent://my-property/my-ns/tp_" + 
UUID.randomUUID().toString();
+        final String subscriptionName = "sub1";
+        final String topicRetry = topicName + "-" + subscriptionName + 
"-RETRY";
+        final String topicDLQ = topicName + "-" + subscriptionName + "-DLQ";
+
+        // Trigger the DLQ and retry topic creation.
+        final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().deadLetterTopic(topicDLQ).maxRedeliverCount(2).build())
+                .receiverQueueSize(100)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        final Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .create();
+        // send messages.
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage()
+                    .value("msg-" + i)
+                    .sendAsync();
+        }
+        producer.flush();
+        for (int i = 0; i < 20; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg != null) {
+                consumer.reconsumeLater(msg, 1, TimeUnit.SECONDS);
+            } else {
+                break;
+            }
+        }
+
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, false);
+
+        // Verify: "retryLetterProducer" and "deadLetterProducer" will be 
closed by "consumer.close()", so these two
+        //  topics can be deleted successfully.
+        admin.topics().delete(topicRetry, false);
+        admin.topics().delete(topicDLQ, false);
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c7af6d3589c..101d42a6694 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,7 +1085,23 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             });
         }
 
-        return closeFuture;
+        ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
+        closeFutures.add(closeFuture);
+        if (retryLetterProducer != null) {
+            
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
+                if (ex != null) {
+                    log.warn("Exception ignored in closing retryLetterProducer 
of consumer", ex);
+                }
+            }));
+        }
+        if (deadLetterProducer != null) {
+            closeFutures.add(deadLetterProducer.thenCompose(p -> 
p.closeAsync()).whenComplete((ignore, ex) -> {
+                if (ex != null) {
+                    log.warn("Exception ignored in closing deadLetterProducer 
of consumer", ex);
+                }
+            }));
+        }
+        return FutureUtil.waitForAll(closeFutures);
     }
 
     private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable 
exception) {

Reply via email to