michaeljmarshall commented on a change in pull request #10159:
URL: https://github.com/apache/pulsar/pull/10159#discussion_r609239453



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -938,6 +938,21 @@ private void closeConsumerTasks() {
         if (possibleSendToDeadLetterTopicMessages != null) {
             possibleSendToDeadLetterTopicMessages.clear();
         }
+
+        if (deadLetterProducer != null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer != null) {
+                    deadLetterProducer.thenApplyAsync(Producer::closeAsync);

Review comment:
       I wasn't sure whether or not this should be closed synchronously or 
asynchronously (I called this out in the PR's description). Thank you for this 
feedback. I am okay with changing the behavior but first want to explain why I 
originally proposed it this way. 
   
   Notice that on the line right after my addition, we call 
`acknowledgmentsGroupingTracker.close();`, which ultimately calls the 
`flushAsync()` method in the `PersistentAcknowledgmentsGroupingTracker` class. 
That method returns a future, but we do not capture it or wait for its 
completion. Further, this `closeConsumerTasks` method is called by the 
`closeAsync` method for the consumer, and it is on the calling thread, so it 
will block the end user calling `closeAsync`. That seems suboptimal to me.
   
   An alternate approach could be to use futures here to asynchronously close 
resources and then use call backs to enforce a proper ordering on the closure 
of different resources.
   
   @merlimat - let me know what you think. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to