This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6eb6a5e578e KAFKA-18776: Fix flaky coordinator disconnect test & fix 
log level (#18866)
6eb6a5e578e is described below

commit 6eb6a5e578ec107c32cbb2fa49902db6658e9b1e
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Feb 13 12:11:45 2025 -0500

    KAFKA-18776: Fix flaky coordinator disconnect test & fix log level (#18866)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../clients/consumer/internals/CoordinatorRequestManager.java      | 2 +-
 .../clients/consumer/internals/CoordinatorRequestManagerTest.java  | 7 ++-----
 2 files changed, 3 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index dd53ae11790..2c9c72e0520 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -175,7 +175,7 @@ public class CoordinatorRequestManager implements 
RequestManager {
             long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs - 
timeMarkedUnknownMs);
             long currDisconnectMin = durationOfOngoingDisconnectMs / 
COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
             if (currDisconnectMin > totalDisconnectedMin) {
-                log.debug("Consumer has been disconnected from the group 
coordinator for {}ms", durationOfOngoingDisconnectMs);
+                log.warn("Consumer has been disconnected from the group 
coordinator for {}ms", durationOfOngoingDisconnectMs);
                 totalDisconnectedMin = currDisconnectMin;
             }
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 0ed902d7f27..c22af52cf89 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -103,10 +103,7 @@ public class CoordinatorRequestManagerTest {
         long oneMinute = 60000;
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
-            // You'd be forgiven for assuming that a warning message would be 
logged at WARN, but
-            // markCoordinatorUnknown logs the warning at DEBUG. This is 
partly for historical parity with the
-            // ClassicKafkaConsumer.
-            appender.setClassLogger(CoordinatorRequestManager.class, 
Level.DEBUG);
+            appender.setClassLogger(CoordinatorRequestManager.class, 
Level.WARN);
             CoordinatorRequestManager coordinatorRequestManager = 
setupCoordinatorManager(GROUP_ID);
             assertFalse(coordinatorRequestManager.coordinator().isPresent());
 
@@ -133,7 +130,7 @@ public class CoordinatorRequestManagerTest {
     }
 
     private Optional<Long> millisecondsFromLog(LogCaptureAppender appender) {
-        Pattern pattern = Pattern.compile("\\s+(?<millis>\\d+)+ms");
+        Pattern pattern = Pattern.compile("^Consumer has been disconnected 
from the group coordinator for (?<millis>\\d+)+ms$");
         List<Long> milliseconds = appender.getMessages().stream()
             .map(pattern::matcher)
             .filter(Matcher::find)

Reply via email to