This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d09e2228461 KAFKA-18189: CoordinatorRequestManager log message can 
include incorrect coordinator disconnect time (#18109)
d09e2228461 is described below

commit d09e2228461791d257db825b7862c11d7b46aba9
Author: Kirk True <[email protected]>
AuthorDate: Wed Dec 11 08:22:51 2024 -0800

    KAFKA-18189: CoordinatorRequestManager log message can include incorrect 
coordinator disconnect time (#18109)
    
    Fixed logic in markCoordinatorUnknown to ensure the warning log contains 
the correct number of milliseconds the client has been disconnected.
    
    Reviewers: Christo Lolov <[email protected]>
---
 .../internals/CoordinatorRequestManager.java       | 28 +++++---
 .../internals/CoordinatorRequestManagerTest.java   | 80 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index c1e367055e4..4664267a0e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -139,22 +139,34 @@ public class CoordinatorRequestManager implements 
RequestManager {
     }
 
     /**
-     * Mark the current coordinator null.
+     * Mark the coordinator as "unknown" (i.e. {@code null}) when a disconnect 
is detected. This detection can occur
+     * in one of two paths:
      *
-     * @param cause         why the coordinator is marked unknown.
-     * @param currentTimeMs the current time in ms.
+     * <ol>
+     *     <li>The coordinator was discovered, but then later disconnected</li>
+     *     <li>The coordinator has not yet been discovered and/or 
connected</li>
+     * </ol>
+     *
+     * @param cause         String explanation of why the coordinator is 
marked unknown
+     * @param currentTimeMs Current time in milliseconds
      */
     public void markCoordinatorUnknown(final String cause, final long 
currentTimeMs) {
-        if (this.coordinator != null) {
-            log.info("Group coordinator {} is unavailable or invalid due to 
cause: {}. "
-                    + "Rediscovery will be attempted.", this.coordinator, 
cause);
-            this.coordinator = null;
+        if (coordinator != null || timeMarkedUnknownMs == -1) {
             timeMarkedUnknownMs = currentTimeMs;
             totalDisconnectedMin = 0;
+        }
+
+        if (coordinator != null) {
+            log.info(
+                "Group coordinator {} is unavailable or invalid due to cause: 
{}. Rediscovery will be attempted.",
+                coordinator,
+                cause
+            );
+            coordinator = null;
         } else {
             long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
             long currDisconnectMin = durationOfOngoingDisconnectMs / 
COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
-            if (currDisconnectMin > this.totalDisconnectedMin) {
+            if (currDisconnectMin > totalDisconnectedMin) {
                 log.debug("Consumer has been disconnected from the group 
coordinator for {}ms", durationOfOngoingDisconnectMs);
                 totalDisconnectedMin = currDisconnectMin;
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index c003574a23f..4f59db3d863 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -28,16 +28,24 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 
+import org.apache.log4j.Level;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -75,6 +83,78 @@ public class CoordinatorRequestManagerTest {
         assertEquals(Collections.emptyList(), pollResult.unsentRequests);
     }
 
+    /**
+     * This test mimics a client that has been disconnected from the 
coordinator. When the client remains disconnected
+     * from the coordinator for 60 seconds, the client will begin to emit a 
warning log every minute thereafter to
+     * alert the user about the ongoing disconnect status. The warning log 
includes the length of time of the ongoing
+     * disconnect:
+     *
+     * <code>
+     *     Consumer has been disconnected from the group coordinator for 
XXXXXms
+     * </code>
+     *
+     * <p/>
+     *
+     * However, the logic used to calculate the length of the disconnect was 
not correct. This test exercises the
+     * disconnect logic, controlling the logging and system time, to ensure 
the warning message is correct.
+     *
+     * @see CoordinatorRequestManager#markCoordinatorUnknown(String, long)
+     */
+    @Test
+    public void testMarkCoordinatorUnknownLoggingAccuracy() {
+        long oneMinute = 60000;
+
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            // You'd be forgiven for assuming that a warning message would be 
logged at WARN, but
+            // markCoordinatorUnknown logs the warning at DEBUG. This is 
partly for historical parity with the
+            // ClassicKafkaConsumer.
+            appender.setClassLogger(CoordinatorRequestManager.class, 
Level.DEBUG);
+            CoordinatorRequestManager coordinatorRequestManager = 
setupCoordinatorManager(GROUP_ID);
+            assertFalse(coordinatorRequestManager.coordinator().isPresent());
+
+            // Step 1: mark the coordinator as disconnected right after 
creation of the CoordinatorRequestManager.
+            // Because the disconnect occurred immediately, no warning should 
be logged.
+            coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
+            assertTrue(millisecondsFromLog(appender).isEmpty());
+
+            // Step 2: sleep for one minute and mark the coordinator unknown 
again. Then verify that the warning was
+            // logged and the reported time is accurate.
+            time.sleep(oneMinute);
+            coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
+            Optional<Long> firstLogMs = millisecondsFromLog(appender);
+            assertTrue(firstLogMs.isPresent());
+            assertEquals(oneMinute, firstLogMs.get());
+
+            // Step 3: sleep for *another* minute, mark the coordinator 
unknown again, and verify the accuracy.
+            time.sleep(oneMinute);
+            coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
+            Optional<Long> secondLogMs = millisecondsFromLog(appender);
+            assertTrue(secondLogMs.isPresent());
+            assertEquals(oneMinute * 2, secondLogMs.get());
+        }
+    }
+
+    private Optional<Long> millisecondsFromLog(LogCaptureAppender appender) {
+        Pattern pattern = Pattern.compile("\\s+(?<millis>\\d+)+ms");
+        List<Long> milliseconds = appender.getMessages().stream()
+            .map(pattern::matcher)
+            .filter(Matcher::find)
+            .map(matcher -> matcher.group("millis"))
+            .filter(Objects::nonNull)
+            .map(millisString -> {
+                try {
+                    return Long.parseLong(millisString);
+                } catch (NumberFormatException e) {
+                    return null;
+                }
+            })
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList());
+
+        // Return the most recent log entry that matches the message in 
markCoordinatorUnknown, if present.
+        return milliseconds.isEmpty() ? Optional.empty() : 
Optional.of(milliseconds.get(milliseconds.size() - 1));
+    }
+
     @Test
     public void testMarkCoordinatorUnknown() {
         CoordinatorRequestManager coordinatorManager = 
setupCoordinatorManager(GROUP_ID);

Reply via email to