This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a2789cf706 KAFKA-17293: New consumer HeartbeatRequestManager should 
rediscover disconnected coordinator (#16844)
6a2789cf706 is described below

commit 6a2789cf70665bc564bfa582fec17f42b93f3616
Author: TengYao Chi <[email protected]>
AuthorDate: Sun Sep 1 21:59:21 2024 +0800

    KAFKA-17293: New consumer HeartbeatRequestManager should rediscover 
disconnected coordinator (#16844)
    
    Reviewers: Lianet Magrans <[email protected]>, TaiJuWu <[email protected]>
---
 .../internals/AbstractHeartbeatRequestManager.java |  1 +
 .../consumer/internals/CommitRequestManager.java   |  9 +--------
 .../internals/CoordinatorRequestManager.java       | 16 ++++++++++++++++
 .../internals/CommitRequestManagerTest.java        | 12 ++++++++----
 .../ConsumerHeartbeatRequestManagerTest.java       | 22 ++++++++++++++++++++++
 5 files changed, 48 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index b310f6de1b9..7e923ca0e46 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -307,6 +307,7 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
         resetHeartbeatState();
         membershipManager().onHeartbeatFailure(exception instanceof 
RetriableException);
         if (exception instanceof RetriableException) {
+            coordinatorRequestManager.handleCoordinatorDisconnect(exception, 
responseTimeMs);
             String message = String.format("%s failed because of the retriable 
exception. Will retry in %s ms: %s",
                 heartbeatRequestName(),
                 heartbeatRequestState.remainingBackoffMs(responseTimeMs),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 814250e0949..939d078ebaf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import 
org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.StaleMemberEpochException;
@@ -565,12 +564,6 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         return pendingRequests.unsentOffsetFetches;
     }
 
-    private void handleCoordinatorDisconnect(Throwable exception, long 
currentTimeMs) {
-        if (exception instanceof DisconnectException) {
-            
coordinatorRequestManager.markCoordinatorUnknown(exception.getMessage(), 
currentTimeMs);
-        }
-    }
-
     /**
      * Update latest member ID and epoch used by the member.
      *
@@ -883,7 +876,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 } else {
                     log.debug("{} completed with error", requestDescription(), 
error);
                     onFailedAttempt(requestCompletionTimeMs);
-                    handleCoordinatorDisconnect(error, 
requestCompletionTimeMs);
+                    
coordinatorRequestManager.handleCoordinatorDisconnect(error, 
requestCompletionTimeMs);
                     future().completeExceptionally(error);
                 }
             } catch (Throwable t) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index 5144b41f574..a9e1bf46bed 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
@@ -122,6 +123,21 @@ public class CoordinatorRequestManager implements 
RequestManager {
         });
     }
 
+    /**
+     * Handles the disconnection of the current coordinator.
+     * This method checks if the given exception is an instance of {@link 
DisconnectException}.
+     * If so, it marks the coordinator as unknown, indicating that the client 
should
+     * attempt to discover a new coordinator. For any other exception type, no 
action is performed.
+     *
+     * @param exception     The exception to handle, which was received as 
part of a request response.
+     * @param currentTimeMs The current time in milliseconds.
+     */
+    public void handleCoordinatorDisconnect(Throwable exception, long 
currentTimeMs) {
+        if (exception instanceof DisconnectException) {
+            markCoordinatorUnknown(exception.getMessage(), currentTimeMs);
+        }
+    }
+
     /**
      * Mark the current coordinator null.
      *
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 33d2d6aae1b..523abf4125c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -459,7 +459,7 @@ public class CommitRequestManagerTest {
         // Commit should mark the coordinator unknown and fail with 
RetriableCommitFailedException.
         assertTrue(commitResult.isDone());
         assertFutureThrows(commitResult, RetriableCommitFailedException.class);
-        assertCoordinatorDisconnect();
+        assertCoordinatorDisconnectHandling();
     }
 
     @Test
@@ -752,7 +752,7 @@ public class CommitRequestManagerTest {
         // Request not completed just yet
         assertFalse(result.isDone());
         if (shouldRediscoverCoordinator) {
-            assertCoordinatorDisconnect();
+            assertCoordinatorDisconnectOnCoordinatorError();
         }
 
         // Request should be retried with backoff.
@@ -782,7 +782,7 @@ public class CommitRequestManagerTest {
 
         // Request not completed just yet, but should have marked the 
coordinator unknown
         assertFalse(result.isDone());
-        assertCoordinatorDisconnect();
+        assertCoordinatorDisconnectHandling();
 
         time.sleep(retryBackoffMs);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
@@ -922,7 +922,11 @@ public class CommitRequestManagerTest {
         assertRetryBackOff(commitRequestManager, retryBackoffMs);
     }
 
-    private void assertCoordinatorDisconnect() {
+    private void assertCoordinatorDisconnectHandling() {
+        verify(coordinatorRequestManager).handleCoordinatorDisconnect(any(), 
anyLong());
+    }
+
+    private void assertCoordinatorDisconnectOnCoordinatorError() {
         verify(coordinatorRequestManager).markCoordinatorUnknown(any(), 
anyLong());
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index f392f87489d..ee5c56296fd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@@ -380,6 +381,27 @@ public class ConsumerHeartbeatRequestManagerTest {
         assertEquals(1, result.unsentRequests.size());
     }
 
+    @Test
+    public void testDisconnect() {
+        createHeartbeatRequestStateWithZeroHeartbeatInterval();
+        NetworkClientDelegate.PollResult result = 
heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        // Mimic disconnect
+        result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), 
DisconnectException.INSTANCE);
+        verify(membershipManager).onHeartbeatFailure(true);
+        // Ensure that the coordinatorManager rediscovers the coordinator
+        verify(coordinatorRequestManager).handleCoordinatorDisconnect(any(), 
anyLong());
+        verify(backgroundEventHandler, never()).add(any());
+
+        time.sleep(DEFAULT_RETRY_BACKOFF_MS - 1);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(0, result.unsentRequests.size(), "No request should be 
generated before the backoff expires");
+
+        time.sleep(1);
+        result = heartbeatRequestManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size(), "A new request should be 
generated after the backoff expires");
+    }
+
     @Test
     public void testFailureOnFatalException() {
         // The initial heartbeatInterval is set to 0

Reply via email to