git-enzo opened a new issue #7545:
URL: https://github.com/apache/pulsar/issues/7545
**Describe the bug**
In MultiTopicsConsumerImpl class there is possibility to swallow an
exception in exception handling during subscribe new topic.
This behavior may occur in ```handleSubscribeOneTopicError()``` method,
which is invoking in ```exceptionally``` block in some CompletableFuture.
This handler method is responsible for completing exceptionally a
```CompletableFuture``` named ```subscribeFuture``` (code below). This future
is completed in one place only and just before that there is possibility that
```checkState()``` method returns an exception. This exception will be
swallowed and it also causes that ```subscribeFuture``` future will be never
completed.
```
if (toCloseNum.decrementAndGet() == 0) {
log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer,
subscribe error: {}",
topic, topicName, error.getMessage());
topics.remove(topicName);
checkState(allTopicPartitionsNumber.get() == consumers.values().size());
// a possible swallowed exception
subscribeFuture.completeExceptionally(error);
}
```
**To Reproduce**
Above behavior can be reproduced using following sample test code:
```
CompletableFuture<Void> someMethod() {
return CompletableFuture.failedFuture(new
RuntimeException("someMethod error")); // always fails for test purposes
}
CompletableFuture<Void> mainMethod() {
CompletableFuture<Void> toFinishFuture = new CompletableFuture<>();
someMethod().thenAccept(x -> {
// some code
// ...
toFinishFuture.complete(null);
}).exceptionally(throwable -> {
checkState(1 == 2); // always fails for test purposes, throws an
exception, which is swallowed
toFinishFuture.completeExceptionally(new RuntimeException("some
error"));
return null;
});
return toFinishFuture;
}
@Test
void swallowedExceptionTest() throws ExecutionException,
InterruptedException, TimeoutException {
CompletableFuture<Void> completableFuture = mainMethod();
completableFuture.whenComplete((aVoid, throwable) -> {
if (throwable == null) {
System.out.println("OK");
} else {
System.out.println("ERROR");
}
}).get(20, TimeUnit.SECONDS); // the completableFuture will never
complete, the TimeoutException will be thrown
}
```
**Expected behavior**
An exception shouldn't be swallowed and completable future named
```subscribeFuture``` should always be completed inside
```handleSubscribeOneTopicError()``` method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]