This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 8c59ca57da9 KAFKA-17455: fix stuck producer when throttling or
retrying (#17527)
8c59ca57da9 is described below
commit 8c59ca57da94341708e9478ef2a728b1e5f1c155
Author: Colt McNealy <[email protected]>
AuthorDate: Thu Jan 9 10:27:04 2025 -0800
KAFKA-17455: fix stuck producer when throttling or retrying (#17527)
A producer might get stuck after it was throttled. This PR unblocks the
producer by polling again
after pollDelayMs in NetworkUtils#awaitReady().
Reviewers: Matthias J. Sax <[email protected]>, David Jacot
<[email protected]>
---
.../apache/kafka/clients/NetworkClientUtils.java | 11 ++++++
.../java/org/apache/kafka/clients/MockClient.java | 20 ++++++++++-
.../clients/producer/internals/SenderTest.java | 40 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
index e044fd48ee3..3e161b1e993 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -71,6 +71,17 @@ public final class NetworkClientUtils {
throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = timeoutMs - (attemptStartTime - startTime); //
initialize in this order to avoid overflow
+
+ // If the network client is waiting to send data for some reason
(eg. throttling or retry backoff),
+ // polling longer than that is potentially dangerous as the
producer will not attempt to send
+ // any pending requests.
+ long waitingTime = client.pollDelayMs(node, startTime);
+ if (waitingTime > 0 && pollTimeout > waitingTime) {
+ // Block only until the next-scheduled time that it's okay to
send data to the producer,
+ // wake up, and try again. This is the way.
+ pollTimeout = waitingTime;
+ }
+
client.poll(pollTimeout, attemptStartTime);
if (client.authenticationException(node) != null)
throw client.authenticationException(node);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 86d1ddf5f41..3e50d5abb07 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -71,6 +71,7 @@ public class MockClient implements KafkaClient {
private int correlation;
private Runnable wakeupHook;
+ private boolean advanceTimeDuringPoll;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>();
@@ -138,7 +139,11 @@ public class MockClient implements KafkaClient {
@Override
public long pollDelayMs(Node node, long now) {
- return connectionDelay(node, now);
+ return connectionState(node.idString()).pollDelayMs(now);
+ }
+
+ public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
+ this.advanceTimeDuringPoll = advanceTimeDuringPoll;
}
public void backoff(Node node, long durationMs) {
@@ -335,6 +340,12 @@ public class MockClient implements KafkaClient {
copy.add(response);
}
+ // In real life, if poll() is called and we get to the end with no
responses,
+ // time equal to timeoutMs would have passed.
+ if (advanceTimeDuringPoll) {
+ time.sleep(timeoutMs);
+ }
+
return copy;
}
@@ -794,6 +805,13 @@ public class MockClient implements KafkaClient {
return 0;
}
+ long pollDelayMs(long now) {
+ if (notThrottled(now))
+ return connectionDelay(now);
+
+ return throttledUntilMs - now;
+ }
+
boolean ready(long now) {
switch (state) {
case CONNECTED:
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index cfeefc0ae5f..8d44cbde5c9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -566,6 +566,46 @@ public class SenderTest {
assertTrue(future.isDone(), "Request should be completed");
}
+ @Test
+ public void
senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
+ // We want MockClient#poll() to advance time so that eventually the
backoff expires.
+ try {
+ client.advanceTimeDuringPoll(true);
+
+ ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
+ apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
+ TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions);
+
+ setupWithTransactionState(txnManager);
+ doInitTransactions(txnManager, producerIdAndEpoch);
+
+ int throttleTimeMs = 1000;
+ long startTime = time.milliseconds();
+ Node nodeToThrottle = metadata.fetch().nodeById(0);
+ client.throttle(nodeToThrottle, throttleTimeMs);
+
+ // Verify node is throttled a little bit. In real-life Apache
Kafka, we observe that this can happen
+ // as done above by throttling or with a disconnect / backoff.
+ long currentPollDelay = client.pollDelayMs(nodeToThrottle,
startTime);
+ assertEquals(currentPollDelay, throttleTimeMs);
+
+ txnManager.beginTransaction();
+ txnManager.maybeAddPartition(tp0);
+
+ assertFalse(txnManager.hasInFlightRequest());
+ sender.runOnce();
+ assertTrue(txnManager.hasInFlightRequest());
+
+ long totalTimeToRunOnce = time.milliseconds() - startTime;
+
+ // It should have blocked roughly only the backoffTimeMs and some
change.
+ assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
+
+ } finally {
+ client.advanceTimeDuringPoll(false);
+ }
+ }
+
@Test
public void testNodeLatencyStats() throws Exception {
try (Metrics m = new Metrics()) {