BewareMyPower commented on code in PR #24638: URL: https://github.com/apache/pulsar/pull/24638#discussion_r2284283296
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java: ########## @@ -2229,13 +2229,15 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { // create operation will complete, the new consumer will be discarded. log.info("[{}] Closed consumer before its creation was completed. consumerId={}", remoteAddress, consumerId); + consumers.remove(consumerId, consumerFuture); commandSender.sendSuccessResponse(requestId); return; } if (consumerFuture.isCompletedExceptionally()) { log.info("[{}] Closed consumer that already failed to be created. consumerId={}", remoteAddress, consumerId); + consumers.remove(consumerId, consumerFuture); Review Comment: This line is the `exceptionally` on the future of handling the subscribe request. However, what I meant is that when you inserted a new consumer future to `ServerCnx#consumers`, you'd better register a callback to remove it from `consumers` once it failed. The current code looks like: ```java final var consumers = new ConcurrentHashMap<Integer, CompletableFuture<Void>>(); final var future = new CompletableFuture<Void>(); final var key = 1; // simulate handleSubscribe consumers.put(key, future); CompletableFuture.delayedExecutor(1, TimeUnit.HOURS).execute(() -> future.completeExceptionally(new RuntimeException("fail"))); Thread.sleep(1000); // simulate handleClose final var existingFuture = consumers.get(key); if (existingFuture == null) { return; } if (!existingFuture.isDone() && existingFuture.completeExceptionally(new RuntimeException("timeout"))) { consumers.remove(key, existingFuture); // NOTE: it should not be missed return; } if (existingFuture.isCompletedExceptionally()) { consumers.remove(key, existingFuture); // NOTE: it should not be missed return; } System.out.println("close"); ``` It can be improved to: ```java consumers.put(key, future); future.exceptionally(e -> { consumers.remove(key, future); return null; }); /* ... */ if (!existingFuture.isDone() && existingFuture.completeExceptionally(new RuntimeException("timeout"))) { consumers.remove(key, existingFuture); return; } ``` 1. You don't need to check `existingFuture.isCompletedExceptionally()` anymore 2. You don't need to manually remove the future after calling `existingFuture.completeExceptionally` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org