This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6a2789cf706 KAFKA-17293: New consumer HeartbeatRequestManager should
rediscover disconnected coordinator (#16844)
6a2789cf706 is described below
commit 6a2789cf70665bc564bfa582fec17f42b93f3616
Author: TengYao Chi <[email protected]>
AuthorDate: Sun Sep 1 21:59:21 2024 +0800
KAFKA-17293: New consumer HeartbeatRequestManager should rediscover
disconnected coordinator (#16844)
Reviewers: Lianet Magrans <[email protected]>, TaiJuWu <[email protected]>
---
.../internals/AbstractHeartbeatRequestManager.java | 1 +
.../consumer/internals/CommitRequestManager.java | 9 +--------
.../internals/CoordinatorRequestManager.java | 16 ++++++++++++++++
.../internals/CommitRequestManagerTest.java | 12 ++++++++----
.../ConsumerHeartbeatRequestManagerTest.java | 22 ++++++++++++++++++++++
5 files changed, 48 insertions(+), 12 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index b310f6de1b9..7e923ca0e46 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -307,6 +307,7 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
resetHeartbeatState();
membershipManager().onHeartbeatFailure(exception instanceof
RetriableException);
if (exception instanceof RetriableException) {
+ coordinatorRequestManager.handleCoordinatorDisconnect(exception,
responseTimeMs);
String message = String.format("%s failed because of the retriable
exception. Will retry in %s ms: %s",
heartbeatRequestName(),
heartbeatRequestState.remainingBackoffMs(responseTimeMs),
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 814250e0949..939d078ebaf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -24,7 +24,6 @@ import
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import
org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -565,12 +564,6 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
return pendingRequests.unsentOffsetFetches;
}
- private void handleCoordinatorDisconnect(Throwable exception, long
currentTimeMs) {
- if (exception instanceof DisconnectException) {
-
coordinatorRequestManager.markCoordinatorUnknown(exception.getMessage(),
currentTimeMs);
- }
- }
-
/**
* Update latest member ID and epoch used by the member.
*
@@ -883,7 +876,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
} else {
log.debug("{} completed with error", requestDescription(),
error);
onFailedAttempt(requestCompletionTimeMs);
- handleCoordinatorDisconnect(error,
requestCompletionTimeMs);
+
coordinatorRequestManager.handleCoordinatorDisconnect(error,
requestCompletionTimeMs);
future().completeExceptionally(error);
}
} catch (Throwable t) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 5144b41f574..a9e1bf46bed 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
@@ -122,6 +123,21 @@ public class CoordinatorRequestManager implements
RequestManager {
});
}
+ /**
+ * Handles the disconnection of the current coordinator.
+ * This method checks if the given exception is an instance of {@link
DisconnectException}.
+ * If so, it marks the coordinator as unknown, indicating that the client
should
+ * attempt to discover a new coordinator. For any other exception type, no
action is performed.
+ *
+ * @param exception The exception to handle, which was received as
part of a request response.
+ * @param currentTimeMs The current time in milliseconds.
+ */
+ public void handleCoordinatorDisconnect(Throwable exception, long
currentTimeMs) {
+ if (exception instanceof DisconnectException) {
+ markCoordinatorUnknown(exception.getMessage(), currentTimeMs);
+ }
+ }
+
/**
* Mark the current coordinator null.
*
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 33d2d6aae1b..523abf4125c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -459,7 +459,7 @@ public class CommitRequestManagerTest {
// Commit should mark the coordinator unknown and fail with
RetriableCommitFailedException.
assertTrue(commitResult.isDone());
assertFutureThrows(commitResult, RetriableCommitFailedException.class);
- assertCoordinatorDisconnect();
+ assertCoordinatorDisconnectHandling();
}
@Test
@@ -752,7 +752,7 @@ public class CommitRequestManagerTest {
// Request not completed just yet
assertFalse(result.isDone());
if (shouldRediscoverCoordinator) {
- assertCoordinatorDisconnect();
+ assertCoordinatorDisconnectOnCoordinatorError();
}
// Request should be retried with backoff.
@@ -782,7 +782,7 @@ public class CommitRequestManagerTest {
// Request not completed just yet, but should have marked the
coordinator unknown
assertFalse(result.isDone());
- assertCoordinatorDisconnect();
+ assertCoordinatorDisconnectHandling();
time.sleep(retryBackoffMs);
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
@@ -922,7 +922,11 @@ public class CommitRequestManagerTest {
assertRetryBackOff(commitRequestManager, retryBackoffMs);
}
- private void assertCoordinatorDisconnect() {
+ private void assertCoordinatorDisconnectHandling() {
+ verify(coordinatorRequestManager).handleCoordinatorDisconnect(any(),
anyLong());
+ }
+
+ private void assertCoordinatorDisconnectOnCoordinatorError() {
verify(coordinatorRequestManager).markCoordinatorUnknown(any(),
anyLong());
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index f392f87489d..ee5c56296fd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -27,6 +27,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@@ -380,6 +381,27 @@ public class ConsumerHeartbeatRequestManagerTest {
assertEquals(1, result.unsentRequests.size());
}
+ @Test
+ public void testDisconnect() {
+ createHeartbeatRequestStateWithZeroHeartbeatInterval();
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ // Mimic disconnect
+ result.unsentRequests.get(0).handler().onFailure(time.milliseconds(),
DisconnectException.INSTANCE);
+ verify(membershipManager).onHeartbeatFailure(true);
+ // Ensure that the coordinatorManager rediscovers the coordinator
+ verify(coordinatorRequestManager).handleCoordinatorDisconnect(any(),
anyLong());
+ verify(backgroundEventHandler, never()).add(any());
+
+ time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result.unsentRequests.size(), "No request should be
generated before the backoff expires");
+
+ time.sleep(1);
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size(), "A new request should be
generated after the backoff expires");
+ }
+
@Test
public void testFailureOnFatalException() {
// The initial heartbeatInterval is set to 0