lhotari commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r821529738



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -116,6 +116,9 @@
 
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = 
CompletableFuture.completedFuture(null);
+    private final CompletableFuture<MessageId> lastSendFutureEmpty = 
CompletableFuture.completedFuture(null);

Review comment:
       A CompletableFuture isn't immutable and shouldn't be shared.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -116,6 +116,9 @@
 
     private final BatchMessageContainerBase batchMessageContainer;
     private CompletableFuture<MessageId> lastSendFuture = 
CompletableFuture.completedFuture(null);
+    private final CompletableFuture<MessageId> lastSendFutureEmpty = 
CompletableFuture.completedFuture(null);
+    private volatile boolean lastSendFutureResponse = false;

Review comment:
       Adding a new field isn't necessary. (I'll explain later)

##########
File path: 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
##########
@@ -264,4 +266,49 @@ public void testGetNumOfPartitions() throws Exception {
         assertEquals(producerImpl.getNumOfPartitions(), 0);
     }
 
+    @Test
+    public void testFlushWhenLastSendFutureFailed() {
+        ProducerConfigurationData producerConfData = new 
ProducerConfigurationData();
+        ProducerImpl<Object> producerImpl = client.newProducerImpl(TOPIC_NAME, 
0, producerConfData,
+                null, null, null, Optional.empty());
+
+        // 1. When no data is sent to this producerImpl,
+        // its lastSendFuture is always in normal completion state
+        CompletableFuture<Void> lastSendFuture = 
producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertFalse(lastSendFuture.isCompletedExceptionally());
+
+        // 2. We set the lastSendFuture of this partition to an abnormal state,
+        // simulating that an exception occurred during the sending process
+        final String failedMessage = "failed last send future";
+        producerImpl.setLastSendFuture(FutureUtil.failedFuture(new 
Throwable(failedMessage)));
+
+        // 3. So when we get its lastSendFuture again,
+        // the future is already in an abnormal failure state
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertTrue(lastSendFuture.isCompletedExceptionally());
+        // 4. The following simple simulation application captures the 
exception processing
+        lastSendFuture.exceptionally(throwable -> {
+            assertNotNull(throwable);
+            assertEquals(throwable.getMessage(), failedMessage);
+            return null;
+        });
+
+        // 5. We have already handled the exception in the step 4,
+        // and then PartitionedProducerImpl will continue to send data.
+        // It should be noted that when this partition is not selected for 
data transmission this time,
+        // its lastSendFuture is still the future that was in an abnormal 
state in the previous step.
+        // When the application calls the flush operation again, in the 
previous logic,
+        // its exception future will be returned to the application,
+        // causing the application to always execute the exception handling 
logic.
+        // In fact, we have handled the exception before,
+        // and we did not send data to this partition this time,
+        // it should not affect this transmission.
+        // So we want the lastSendFuture here to be in normal state.
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());

Review comment:
       this should be tested by observing the external behavior. for example, 
sending a message while the connection is broken. the flush should fail only 
once and it should be possible to send more messages after the connection 
resumes.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -2209,5 +2224,26 @@ boolean isErrorStat() {
         return errorState;
     }
 
+    @VisibleForTesting
+    CompletableFuture<Void> getLastSendFuture() {
+        CompletableFuture<MessageId> lastSendFuture;
+        if (lastSendFutureResponse) {
+            lastSendFuture = this.lastSendFutureEmpty;
+        } else {
+            lastSendFuture = this.lastSendFuture;
+            lastSendFuture.exceptionally(ignored -> {
+                lastSendFutureResponse = true;
+                return null;
+            });
+        }
+
+        return lastSendFuture.thenApply(ignored -> null);
+    }
+
+    @VisibleForTesting
+    void setLastSendFuture(CompletableFuture<MessageId> lastSendFuture) {
+        this.lastSendFuture = lastSendFuture;
+    }
+

Review comment:
       It's better to focus on testing external behavior that can be observed.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1975,8 +1981,17 @@ private void 
failPendingBatchMessages(PulsarClientException ex) {
             if (isBatchMessagingEnabled()) {
                 batchMessageAndSend();
             }
-            lastSendFuture = this.lastSendFuture;
+            if (lastSendFutureResponse) {
+                lastSendFuture = this.lastSendFutureEmpty;
+            } else {
+                lastSendFuture = this.lastSendFuture;
+                lastSendFuture.exceptionally(ignored -> {
+                    lastSendFutureResponse = true;
+                    return null;
+                });
+            }
         }
+
         return lastSendFuture.thenApply(ignored -> null);

Review comment:
       Since the intention seems to be to ignore an exception that is delivered 
once, it would be better to make that more explicit. It would be a breaking 
change if calling flushAsync wouldn't deliver a possible exception to the 
caller at all.
   
   To prevent race conditions this would have to be implemented in a different 
way. The high level solution would be to create a class that contains 2 fields: 
a completable future and a AtomicBoolean field (or volate. The "lastSendFuture" 
field should reference this wrapper and ProducerImpl should delegate actions to 
it. The wrapper would contain behavior that makes it return the exception only 
for the "first call". Technically this could be implemented using 
CompletableFuture.handle .  It's possible to re-throw the exception in handle 
when wrapped with CompletionException. That would be done only the "first 
call". AtomicBoolean's compareAndSet could be used to ensure that the exception 
is thrown only on the "first call". ("first call" isn't really a call in this 
case, but I found it easier to explain it that way)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to