jolshan commented on code in PR #12922:
URL: https://github.com/apache/kafka/pull/12922#discussion_r1035324593


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##########
@@ -808,6 +808,162 @@ public void testIdempotenceWithMultipleInflights() throws 
Exception {
         assertEquals(1, request2.get().offset());
     }
 
+    @Test
+    public void testIdempotenceWithDeliveryTimeout() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = createTransactionManager();
+        setupWithTransactionState(transactionManager);
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        // Send two produce requests with a time gap between them. This allows
+        // us to test delivery timeout expiration of the first request without
+        // affecting the second.
+        Future<RecordMetadata> request1 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(1, client.inFlightRequestCount());
+
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+
+        Future<RecordMetadata> request2 = appendToAccumulator(tp0);
+        sender.runOnce();
+        assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
+        assertEquals(2, client.inFlightRequestCount());
+
+        // The first request fails after the delivery timeout is reached, but
+        // we still expect the remaining request to be inflight.
+        time.sleep(DELIVERY_TIMEOUT_MS / 2);
+        sendIdempotentProducerResponse(0, tp0, Errors.REQUEST_TIMED_OUT, -1L);
+        sender.runOnce();
+        TestUtils.assertFutureThrows(request1, TimeoutException.class);
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Later the second request returns successfully (which implies the 
first request
+        // was also successful). A successful result should be returned to the 
user.
+        sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1L);

Review Comment:
   Do we have a case where this fails? Implying the first request was not 
successful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to