Vanlightly commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r693802693



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void 
completePendingReceive(CompletableFuture<Message<T>> receivedFutu
         });
     }
 
-    protected void 
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    protected void failPendingReceives() {

Review comment:
       Agreed, updating.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void 
completePendingReceive(CompletableFuture<Message<T>> receivedFutu
         });
     }
 
-    protected void 
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> 
pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    protected void failPendingReceives() {
         while (!pendingReceives.isEmpty()) {
             CompletableFuture<Message<T>> receiveFuture = 
pendingReceives.poll();
             if (receiveFuture == null) {
                 break;
             }
             if (!receiveFuture.isDone()) {
                 receiveFuture.completeExceptionally(
-                        new 
PulsarClientException.AlreadyClosedException(String.format("The consumer which 
subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes 
the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing 
the consumers", topic, subscription)));
             }
         }
     }
 
-    protected void 
failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> 
pendingBatchReceives) {
-        while (!pendingBatchReceives.isEmpty()) {
-            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+    protected void failPendingBatchReceives() {

Review comment:
       Agreed, updating.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to