lhotari commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r821545434
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1975,8 +1981,17 @@ private void
failPendingBatchMessages(PulsarClientException ex) {
if (isBatchMessagingEnabled()) {
batchMessageAndSend();
}
- lastSendFuture = this.lastSendFuture;
+ if (lastSendFutureResponse) {
+ lastSendFuture = this.lastSendFutureEmpty;
+ } else {
+ lastSendFuture = this.lastSendFuture;
+ lastSendFuture.exceptionally(ignored -> {
+ lastSendFutureResponse = true;
+ return null;
+ });
+ }
}
+
return lastSendFuture.thenApply(ignored -> null);
Review comment:
Since the intention seems to be to ignore an exception that is delivered
once, it would be better to make that more explicit. It would be a breaking
change if calling flushAsync wouldn't deliver a possible exception to the
caller at all.
To prevent race conditions this would have to be implemented in a different
way. The high level solution would be to create a class (internal private
static class would be fine) that contains 2 fields: a completable future and a
AtomicBoolean field (or volate. The "lastSendFuture" field should reference
this wrapper and ProducerImpl should delegate actions to it. The wrapper would
contain behavior that makes it return the exception only for the "first call".
Technically this could be implemented using CompletableFuture.handle . It's
possible to re-throw the exception in handle when wrapped with
CompletionException. That would be done only the "first call". AtomicBoolean's
compareAndSet could be used to ensure that the exception is thrown only on the
"first call". ("first call" isn't really a call in this case, but I found it
easier to explain it that way)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]