BewareMyPower commented on code in PR #24638:
URL: https://github.com/apache/pulsar/pull/24638#discussion_r2284283296


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2229,13 +2229,15 @@ protected void handleCloseConsumer(CommandCloseConsumer 
closeConsumer) {
             // create operation will complete, the new consumer will be 
discarded.
             log.info("[{}] Closed consumer before its creation was completed. 
consumerId={}",
                      remoteAddress, consumerId);
+            consumers.remove(consumerId, consumerFuture);
             commandSender.sendSuccessResponse(requestId);
             return;
         }
 
         if (consumerFuture.isCompletedExceptionally()) {
             log.info("[{}] Closed consumer that already failed to be created. 
consumerId={}",
                      remoteAddress, consumerId);
+            consumers.remove(consumerId, consumerFuture);

Review Comment:
   This line is the `exceptionally` on the future of handling the subscribe 
request. However, what I meant is that when you inserted a new consumer future 
to `ServerCnx#consumers`, you'd better register a callback to remove it from 
`consumers` once it failed.
   
   The current code looks like:
   
   ```java
           final var consumers = new ConcurrentHashMap<Integer, 
CompletableFuture<Void>>();
           final var future = new CompletableFuture<Void>();
           final var key = 1;
           // simulate handleSubscribe
           consumers.put(key, future);
           CompletableFuture.delayedExecutor(1, TimeUnit.HOURS).execute(() ->
                   future.completeExceptionally(new RuntimeException("fail")));
   
           Thread.sleep(1000);
   
           // simulate handleClose
           final var existingFuture = consumers.get(key);
           if (existingFuture == null) {
               return;
           }
           if (!existingFuture.isDone() && 
existingFuture.completeExceptionally(new RuntimeException("timeout"))) {
               consumers.remove(key, existingFuture); // NOTE: it should not be 
missed
               return;
           }
           if (existingFuture.isCompletedExceptionally()) {
               consumers.remove(key, existingFuture); // NOTE: it should not be 
missed
               return;
           }
           System.out.println("close");
   ```
   
   It can be improved to:
   
   ```java
           consumers.put(key, future);
           future.exceptionally(e -> {
               consumers.remove(key, future);
               return null;
           });
           /* ... */
           if (!existingFuture.isDone() && 
existingFuture.completeExceptionally(new RuntimeException("timeout"))) {
               consumers.remove(key, existingFuture);
               return;
           }
   ```
   
   1. You don't need to check `existingFuture.isCompletedExceptionally()` 
anymore
   2. You don't need to manually remove the future after calling 
`existingFuture.completeExceptionally`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to