This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d09e2228461 KAFKA-18189: CoordinatorRequestManager log message can
include incorrect coordinator disconnect time (#18109)
d09e2228461 is described below
commit d09e2228461791d257db825b7862c11d7b46aba9
Author: Kirk True <[email protected]>
AuthorDate: Wed Dec 11 08:22:51 2024 -0800
KAFKA-18189: CoordinatorRequestManager log message can include incorrect
coordinator disconnect time (#18109)
Fixed logic in markCoordinatorUnknown to ensure the warning log contains
the correct number of milliseconds the client has been disconnected.
Reviewers: Christo Lolov <[email protected]>
---
.../internals/CoordinatorRequestManager.java | 28 +++++---
.../internals/CoordinatorRequestManagerTest.java | 80 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index c1e367055e4..4664267a0e8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -139,22 +139,34 @@ public class CoordinatorRequestManager implements
RequestManager {
}
/**
- * Mark the current coordinator null.
+ * Mark the coordinator as "unknown" (i.e. {@code null}) when a disconnect
is detected. This detection can occur
+ * in one of two paths:
*
- * @param cause why the coordinator is marked unknown.
- * @param currentTimeMs the current time in ms.
+ * <ol>
+ * <li>The coordinator was discovered, but then later disconnected</li>
+ * <li>The coordinator has not yet been discovered and/or
connected</li>
+ * </ol>
+ *
+ * @param cause String explanation of why the coordinator is
marked unknown
+ * @param currentTimeMs Current time in milliseconds
*/
public void markCoordinatorUnknown(final String cause, final long
currentTimeMs) {
- if (this.coordinator != null) {
- log.info("Group coordinator {} is unavailable or invalid due to
cause: {}. "
- + "Rediscovery will be attempted.", this.coordinator,
cause);
- this.coordinator = null;
+ if (coordinator != null || timeMarkedUnknownMs == -1) {
timeMarkedUnknownMs = currentTimeMs;
totalDisconnectedMin = 0;
+ }
+
+ if (coordinator != null) {
+ log.info(
+ "Group coordinator {} is unavailable or invalid due to cause:
{}. Rediscovery will be attempted.",
+ coordinator,
+ cause
+ );
+ coordinator = null;
} else {
long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs -
timeMarkedUnknownMs);
long currDisconnectMin = durationOfOngoingDisconnectMs /
COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
- if (currDisconnectMin > this.totalDisconnectedMin) {
+ if (currDisconnectMin > totalDisconnectedMin) {
log.debug("Consumer has been disconnected from the group
coordinator for {}ms", durationOfOngoingDisconnectMs);
totalDisconnectedMin = currDisconnectMin;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index c003574a23f..4f59db3d863 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -28,16 +28,24 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.log4j.Level;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -75,6 +83,78 @@ public class CoordinatorRequestManagerTest {
assertEquals(Collections.emptyList(), pollResult.unsentRequests);
}
+ /**
+ * This test mimics a client that has been disconnected from the
coordinator. When the client remains disconnected
+ * from the coordinator for 60 seconds, the client will begin to emit a
warning log every minute thereafter to
+ * alert the user about the ongoing disconnect status. The warning log
includes the length of time of the ongoing
+ * disconnect:
+ *
+ * <code>
+ * Consumer has been disconnected from the group coordinator for
XXXXXms
+ * </code>
+ *
+ * <p/>
+ *
+ * However, the logic used to calculate the length of the disconnect was
not correct. This test exercises the
+ * disconnect logic, controlling the logging and system time, to ensure
the warning message is correct.
+ *
+ * @see CoordinatorRequestManager#markCoordinatorUnknown(String, long)
+ */
+ @Test
+ public void testMarkCoordinatorUnknownLoggingAccuracy() {
+ long oneMinute = 60000;
+
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ // You'd be forgiven for assuming that a warning message would be
logged at WARN, but
+ // markCoordinatorUnknown logs the warning at DEBUG. This is
partly for historical parity with the
+ // ClassicKafkaConsumer.
+ appender.setClassLogger(CoordinatorRequestManager.class,
Level.DEBUG);
+ CoordinatorRequestManager coordinatorRequestManager =
setupCoordinatorManager(GROUP_ID);
+ assertFalse(coordinatorRequestManager.coordinator().isPresent());
+
+ // Step 1: mark the coordinator as disconnected right after
creation of the CoordinatorRequestManager.
+ // Because the disconnect occurred immediately, no warning should
be logged.
+ coordinatorRequestManager.markCoordinatorUnknown("test",
time.milliseconds());
+ assertTrue(millisecondsFromLog(appender).isEmpty());
+
+ // Step 2: sleep for one minute and mark the coordinator unknown
again. Then verify that the warning was
+ // logged and the reported time is accurate.
+ time.sleep(oneMinute);
+ coordinatorRequestManager.markCoordinatorUnknown("test",
time.milliseconds());
+ Optional<Long> firstLogMs = millisecondsFromLog(appender);
+ assertTrue(firstLogMs.isPresent());
+ assertEquals(oneMinute, firstLogMs.get());
+
+ // Step 3: sleep for *another* minute, mark the coordinator
unknown again, and verify the accuracy.
+ time.sleep(oneMinute);
+ coordinatorRequestManager.markCoordinatorUnknown("test",
time.milliseconds());
+ Optional<Long> secondLogMs = millisecondsFromLog(appender);
+ assertTrue(secondLogMs.isPresent());
+ assertEquals(oneMinute * 2, secondLogMs.get());
+ }
+ }
+
+ private Optional<Long> millisecondsFromLog(LogCaptureAppender appender) {
+ Pattern pattern = Pattern.compile("\\s+(?<millis>\\d+)+ms");
+ List<Long> milliseconds = appender.getMessages().stream()
+ .map(pattern::matcher)
+ .filter(Matcher::find)
+ .map(matcher -> matcher.group("millis"))
+ .filter(Objects::nonNull)
+ .map(millisString -> {
+ try {
+ return Long.parseLong(millisString);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ // Return the most recent log entry that matches the message in
markCoordinatorUnknown, if present.
+ return milliseconds.isEmpty() ? Optional.empty() :
Optional.of(milliseconds.get(milliseconds.size() - 1));
+ }
+
@Test
public void testMarkCoordinatorUnknown() {
CoordinatorRequestManager coordinatorManager =
setupCoordinatorManager(GROUP_ID);