Vanlightly commented on a change in pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#discussion_r693802693
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void
completePendingReceive(CompletableFuture<Message<T>> receivedFutu
});
}
- protected void
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives) {
+ protected CompletableFuture<Void> failPendingReceive() {
+ if (internalPinnedExecutor.isShutdown()) {
+ // we need to fail any pending receives no matter what,
+ // to avoid blocking user code
+ failPendingReceives();
+ failPendingBatchReceives();
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ try {
+ failPendingReceives();
+ failPendingBatchReceives();
+ } finally {
+ future.complete(null);
+ }
+ });
+ return future;
+ }
+ }
+
+ protected void failPendingReceives() {
Review comment:
Agreed, updating.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -244,29 +230,52 @@ protected void
completePendingReceive(CompletableFuture<Message<T>> receivedFutu
});
}
- protected void
failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>>
pendingReceives) {
+ protected CompletableFuture<Void> failPendingReceive() {
+ if (internalPinnedExecutor.isShutdown()) {
+ // we need to fail any pending receives no matter what,
+ // to avoid blocking user code
+ failPendingReceives();
+ failPendingBatchReceives();
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ internalPinnedExecutor.execute(() -> {
+ try {
+ failPendingReceives();
+ failPendingBatchReceives();
+ } finally {
+ future.complete(null);
+ }
+ });
+ return future;
+ }
+ }
+
+ protected void failPendingReceives() {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receiveFuture =
pendingReceives.poll();
if (receiveFuture == null) {
break;
}
if (!receiveFuture.isDone()) {
receiveFuture.completeExceptionally(
- new
PulsarClientException.AlreadyClosedException(String.format("The consumer which
subscribes the topic %s with subscription name %s " +
+ new PulsarClientException.AlreadyClosedException(
+ String.format("The consumer which subscribes
the topic %s with subscription name %s " +
"was already closed when cleaning and closing
the consumers", topic, subscription)));
}
}
}
- protected void
failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>>
pendingBatchReceives) {
- while (!pendingBatchReceives.isEmpty()) {
- OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+ protected void failPendingBatchReceives() {
Review comment:
Agreed, updating.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]