michaeljmarshall commented on a change in pull request #14602:
URL: https://github.com/apache/pulsar/pull/14602#discussion_r822322836



##########
File path: 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
##########
@@ -264,4 +266,49 @@ public void testGetNumOfPartitions() throws Exception {
         assertEquals(producerImpl.getNumOfPartitions(), 0);
     }
 
+    @Test
+    public void testFlushWhenLastSendFutureFailed() {
+        ProducerConfigurationData producerConfData = new 
ProducerConfigurationData();
+        ProducerImpl<Object> producerImpl = client.newProducerImpl(TOPIC_NAME, 
0, producerConfData,
+                null, null, null, Optional.empty());
+
+        // 1. When no data is sent to this producerImpl,
+        // its lastSendFuture is always in normal completion state
+        CompletableFuture<Void> lastSendFuture = 
producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertFalse(lastSendFuture.isCompletedExceptionally());
+
+        // 2. We set the lastSendFuture of this partition to an abnormal state,
+        // simulating that an exception occurred during the sending process
+        final String failedMessage = "failed last send future";
+        producerImpl.setLastSendFuture(FutureUtil.failedFuture(new 
Throwable(failedMessage)));
+
+        // 3. So when we get its lastSendFuture again,
+        // the future is already in an abnormal failure state
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());
+        assertTrue(lastSendFuture.isCompletedExceptionally());
+        // 4. The following simple simulation application captures the 
exception processing
+        lastSendFuture.exceptionally(throwable -> {
+            assertNotNull(throwable);
+            assertEquals(throwable.getMessage(), failedMessage);
+            return null;
+        });
+
+        // 5. We have already handled the exception in the step 4,
+        // and then PartitionedProducerImpl will continue to send data.
+        // It should be noted that when this partition is not selected for 
data transmission this time,
+        // its lastSendFuture is still the future that was in an abnormal 
state in the previous step.
+        // When the application calls the flush operation again, in the 
previous logic,
+        // its exception future will be returned to the application,
+        // causing the application to always execute the exception handling 
logic.
+        // In fact, we have handled the exception before,
+        // and we did not send data to this partition this time,
+        // it should not affect this transmission.
+        // So we want the lastSendFuture here to be in normal state.
+        lastSendFuture = producerImpl.getLastSendFuture();
+        assertTrue(lastSendFuture.isDone());

Review comment:
       @wenbingshen - let me know if you need pointers to relevant tests. I can 
try to help find good tests tomorrow (my time).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to