This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 8c59ca57da9 KAFKA-17455: fix stuck producer when throttling or 
retrying (#17527)
8c59ca57da9 is described below

commit 8c59ca57da94341708e9478ef2a728b1e5f1c155
Author: Colt McNealy <[email protected]>
AuthorDate: Thu Jan 9 10:27:04 2025 -0800

    KAFKA-17455: fix stuck producer when throttling or retrying (#17527)
    
    A producer might get stuck after it was throttled. This PR unblocks the 
producer by polling again
    after pollDelayMs in NetworkUtils#awaitReady().
    
    Reviewers: Matthias J. Sax <[email protected]>, David Jacot 
<[email protected]>
---
 .../apache/kafka/clients/NetworkClientUtils.java   | 11 ++++++
 .../java/org/apache/kafka/clients/MockClient.java  | 20 ++++++++++-
 .../clients/producer/internals/SenderTest.java     | 40 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index e044fd48ee3..3e161b1e993 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -71,6 +71,17 @@ public final class NetworkClientUtils {
                 throw new IOException("Connection to " + node + " failed.");
             }
             long pollTimeout = timeoutMs - (attemptStartTime - startTime); // 
initialize in this order to avoid overflow
+
+            // If the network client is waiting to send data for some reason 
(eg. throttling or retry backoff),
+            // polling longer than that is potentially dangerous as the 
producer will not attempt to send
+            // any pending requests.
+            long waitingTime = client.pollDelayMs(node, startTime);
+            if (waitingTime > 0 && pollTimeout > waitingTime) {
+                // Block only until the next-scheduled time that it's okay to 
send data to the producer,
+                // wake up, and try again. This is the way.
+                pollTimeout = waitingTime;
+            }
+
             client.poll(pollTimeout, attemptStartTime);
             if (client.authenticationException(node) != null)
                 throw client.authenticationException(node);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 86d1ddf5f41..3e50d5abb07 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -71,6 +71,7 @@ public class MockClient implements KafkaClient {
 
     private int correlation;
     private Runnable wakeupHook;
+    private boolean advanceTimeDuringPoll;
     private final Time time;
     private final MockMetadataUpdater metadataUpdater;
     private final Map<String, ConnectionState> connections = new HashMap<>();
@@ -138,7 +139,11 @@ public class MockClient implements KafkaClient {
 
     @Override
     public long pollDelayMs(Node node, long now) {
-        return connectionDelay(node, now);
+        return connectionState(node.idString()).pollDelayMs(now);
+    }
+
+    public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
+        this.advanceTimeDuringPoll = advanceTimeDuringPoll;
     }
 
     public void backoff(Node node, long durationMs) {
@@ -335,6 +340,12 @@ public class MockClient implements KafkaClient {
             copy.add(response);
         }
 
+        // In real life, if poll() is called and we get to the end with no 
responses,
+        // time equal to timeoutMs would have passed.
+        if (advanceTimeDuringPoll) {
+            time.sleep(timeoutMs);
+        }
+
         return copy;
     }
 
@@ -794,6 +805,13 @@ public class MockClient implements KafkaClient {
             return 0;
         }
 
+        long pollDelayMs(long now) {
+            if (notThrottled(now))
+                return connectionDelay(now);
+
+            return throttledUntilMs - now;
+        }
+
         boolean ready(long now) {
             switch (state) {
                 case CONNECTED:
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index cfeefc0ae5f..8d44cbde5c9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -566,6 +566,46 @@ public class SenderTest {
         assertTrue(future.isDone(), "Request should be completed");
     }
 
+    @Test
+    public void 
senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
+        // We want MockClient#poll() to advance time so that eventually the 
backoff expires.
+        try {
+            client.advanceTimeDuringPoll(true);
+
+            ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
+            apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions);
+
+            setupWithTransactionState(txnManager);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+
+            int throttleTimeMs = 1000;
+            long startTime = time.milliseconds();
+            Node nodeToThrottle = metadata.fetch().nodeById(0);
+            client.throttle(nodeToThrottle, throttleTimeMs);
+
+            // Verify node is throttled a little bit. In real-life Apache 
Kafka, we observe that this can happen
+            // as done above by throttling or with a disconnect / backoff.
+            long currentPollDelay = client.pollDelayMs(nodeToThrottle, 
startTime);
+            assertEquals(currentPollDelay, throttleTimeMs);
+
+            txnManager.beginTransaction();
+            txnManager.maybeAddPartition(tp0);
+
+            assertFalse(txnManager.hasInFlightRequest());
+            sender.runOnce();
+            assertTrue(txnManager.hasInFlightRequest());
+
+            long totalTimeToRunOnce = time.milliseconds() - startTime;
+
+            // It should have blocked roughly only the backoffTimeMs and some 
change.
+            assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
+
+        } finally {
+            client.advanceTimeDuringPoll(false);
+        }
+    }
+
     @Test
     public void testNodeLatencyStats() throws Exception {
         try (Metrics m = new Metrics()) {

Reply via email to