This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6a4078ddde7 KAFKA-16215; KAFKA-16178; Fix member not rejoining after
error (#15311)
6a4078ddde7 is described below
commit 6a4078ddde7888ca12582b6dff13f9ece0cf4d59
Author: Lianet Magrans <[email protected]>
AuthorDate: Mon Feb 12 02:16:57 2024 -0500
KAFKA-16215; KAFKA-16178; Fix member not rejoining after error (#15311)
This fixes a bug that was causing that members wouldn't rejoin the group
after receiving an error in the heartbeat response (ex. fenced, not
coordinator, as reported in KAFKA-16215 and KAFKA-16178). The issue was that
when receiving a response with error, the response receive time was not being
updated, so following heartbeat would be skipped, considering that there was
already a previous one inflight.
Reviewers: Andrew Schofield <[email protected]>, David Jacot
<[email protected]>
---
.../internals/HeartbeatRequestManager.java | 17 +++++++++++-----
.../internals/HeartbeatRequestManagerTest.java | 23 +++++++++++++++++++---
2 files changed, 32 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index 03e11ddfa02..246ea05b220 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -267,11 +267,12 @@ public class HeartbeatRequestManager implements
RequestManager {
return logResponse(request);
else
return request.whenComplete((response, exception) -> {
+ long completionTimeMs = request.handler().completionTimeMs();
if (response != null) {
metricsManager.recordRequestLatency(response.requestLatencyMs());
- onResponse((ConsumerGroupHeartbeatResponse)
response.responseBody(), request.handler().completionTimeMs());
+ onResponse((ConsumerGroupHeartbeatResponse)
response.responseBody(), completionTimeMs);
} else {
- onFailure(exception, request.handler().completionTimeMs());
+ onFailure(exception, completionTimeMs);
}
});
}
@@ -341,9 +342,8 @@ public class HeartbeatRequestManager implements
RequestManager {
String message;
this.heartbeatState.reset();
+ this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
- // TODO: upon encountering a fatal/fenced error, trigger
onPartitionLost logic to give up the current
- // assignments.
switch (error) {
case NOT_COORDINATOR:
// the manager should retry immediately when the coordinator
node becomes available again
@@ -352,6 +352,8 @@ public class HeartbeatRequestManager implements
RequestManager {
coordinatorRequestManager.coordinator());
logInfo(message, response, currentTimeMs);
coordinatorRequestManager.markCoordinatorUnknown(errorMessage,
currentTimeMs);
+ // Skip backoff so that the next HB is sent as soon as the new
coordinator is discovered
+ heartbeatRequestState.reset();
break;
case COORDINATOR_NOT_AVAILABLE:
@@ -360,6 +362,8 @@ public class HeartbeatRequestManager implements
RequestManager {
coordinatorRequestManager.coordinator());
logInfo(message, response, currentTimeMs);
coordinatorRequestManager.markCoordinatorUnknown(errorMessage,
currentTimeMs);
+ // Skip backoff so that the next HB is sent as soon as the new
coordinator is discovered
+ heartbeatRequestState.reset();
break;
case COORDINATOR_LOAD_IN_PROGRESS:
@@ -368,7 +372,6 @@ public class HeartbeatRequestManager implements
RequestManager {
"Will retry",
coordinatorRequestManager.coordinator());
logInfo(message, response, currentTimeMs);
- heartbeatRequestState.onFailedAttempt(currentTimeMs);
break;
case GROUP_AUTHORIZATION_FAILED:
@@ -397,6 +400,8 @@ public class HeartbeatRequestManager implements
RequestManager {
membershipManager.memberId(),
membershipManager.memberEpoch());
logInfo(message, response, currentTimeMs);
membershipManager.transitionToFenced();
+ // Skip backoff so that a next HB to rejoin is sent as soon as
the fenced member releases its assignment
+ heartbeatRequestState.reset();
break;
case UNKNOWN_MEMBER_ID:
@@ -404,6 +409,8 @@ public class HeartbeatRequestManager implements
RequestManager {
membershipManager.memberId());
logInfo(message, response, currentTimeMs);
membershipManager.transitionToFenced();
+ // Skip backoff so that a next HB to rejoin is sent as soon as
the fenced member releases its assignment
+ heartbeatRequestState.reset();
break;
default:
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 0df6fa94c39..bc014efa773 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -445,16 +445,26 @@ public class HeartbeatRequestManagerTest {
case COORDINATOR_LOAD_IN_PROGRESS:
verify(backgroundEventHandler, never()).add(any());
- assertEquals(DEFAULT_RETRY_BACKOFF_MS,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+ assertEquals(DEFAULT_RETRY_BACKOFF_MS,
+
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()), "Request should " +
+ "backoff after receiving a coordinator load in
progress error. ");
break;
case COORDINATOR_NOT_AVAILABLE:
case NOT_COORDINATOR:
verify(backgroundEventHandler, never()).add(any());
verify(coordinatorRequestManager).markCoordinatorUnknown(any(), anyLong());
- assertEquals(0,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()));
+ assertEquals(0,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
+ "Request should not apply backoff so that the next
heartbeat is sent " +
+ "as soon as the new coordinator is discovered.");
+ break;
+ case UNKNOWN_MEMBER_ID:
+ case FENCED_MEMBER_EPOCH:
+ verify(backgroundEventHandler, never()).add(any());
+ assertEquals(0,
heartbeatRequestState.nextHeartbeatMs(time.milliseconds()),
+ "Request should not apply backoff so that the next
heartbeat to rejoin is " +
+ "sent as soon as the fenced member releases its
assignment.");
break;
-
default:
if (isFatal) {
// The memberStateManager should have stopped heartbeat at
this point
@@ -465,6 +475,13 @@ public class HeartbeatRequestManagerTest {
}
break;
}
+
+ if (!isFatal) {
+ // Make sure a next heartbeat is sent for all non-fatal errors (to
retry or rejoin)
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ }
}
@Test