lhotari commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r821545434



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1975,8 +1981,17 @@ private void 
failPendingBatchMessages(PulsarClientException ex) {
             if (isBatchMessagingEnabled()) {
                 batchMessageAndSend();
             }
-            lastSendFuture = this.lastSendFuture;
+            if (lastSendFutureResponse) {
+                lastSendFuture = this.lastSendFutureEmpty;
+            } else {
+                lastSendFuture = this.lastSendFuture;
+                lastSendFuture.exceptionally(ignored -> {
+                    lastSendFutureResponse = true;
+                    return null;
+                });
+            }
         }
+
         return lastSendFuture.thenApply(ignored -> null);

Review comment:
       Since the intention seems to be to ignore an exception that is delivered 
once, it would be better to make that more explicit. It would be a breaking 
change if calling flushAsync wouldn't deliver a possible exception to the 
caller at all.
   
   To prevent race conditions this would have to be implemented in a different 
way. The high level solution would be to create a class (internal private 
static class would be fine) that contains 2 fields: a completable future and a 
AtomicBoolean field (or volate. The "lastSendFuture" field should reference 
this wrapper and ProducerImpl should delegate actions to it. The wrapper would 
contain behavior that makes it return the exception only for the "first call". 
Technically this could be implemented using CompletableFuture.handle .  It's 
possible to re-throw the exception in handle when wrapped with 
CompletionException. That would be done only the "first call". AtomicBoolean's 
compareAndSet could be used to ensure that the exception is thrown only on the 
"first call". ("first call" isn't really a call in this case, but I found it 
easier to explain it that way)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to