This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6eb6a5e578e KAFKA-18776: Fix flaky coordinator disconnect test & fix
log level (#18866)
6eb6a5e578e is described below
commit 6eb6a5e578ec107c32cbb2fa49902db6658e9b1e
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Feb 13 12:11:45 2025 -0500
KAFKA-18776: Fix flaky coordinator disconnect test & fix log level (#18866)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../clients/consumer/internals/CoordinatorRequestManager.java | 2 +-
.../clients/consumer/internals/CoordinatorRequestManagerTest.java | 7 ++-----
2 files changed, 3 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
index dd53ae11790..2c9c72e0520 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java
@@ -175,7 +175,7 @@ public class CoordinatorRequestManager implements
RequestManager {
long durationOfOngoingDisconnectMs = Math.max(0, currentTimeMs -
timeMarkedUnknownMs);
long currDisconnectMin = durationOfOngoingDisconnectMs /
COORDINATOR_DISCONNECT_LOGGING_INTERVAL_MS;
if (currDisconnectMin > totalDisconnectedMin) {
- log.debug("Consumer has been disconnected from the group
coordinator for {}ms", durationOfOngoingDisconnectMs);
+ log.warn("Consumer has been disconnected from the group
coordinator for {}ms", durationOfOngoingDisconnectMs);
totalDisconnectedMin = currDisconnectMin;
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
index 0ed902d7f27..c22af52cf89 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java
@@ -103,10 +103,7 @@ public class CoordinatorRequestManagerTest {
long oneMinute = 60000;
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
- // You'd be forgiven for assuming that a warning message would be
logged at WARN, but
- // markCoordinatorUnknown logs the warning at DEBUG. This is
partly for historical parity with the
- // ClassicKafkaConsumer.
- appender.setClassLogger(CoordinatorRequestManager.class,
Level.DEBUG);
+ appender.setClassLogger(CoordinatorRequestManager.class,
Level.WARN);
CoordinatorRequestManager coordinatorRequestManager =
setupCoordinatorManager(GROUP_ID);
assertFalse(coordinatorRequestManager.coordinator().isPresent());
@@ -133,7 +130,7 @@ public class CoordinatorRequestManagerTest {
}
private Optional<Long> millisecondsFromLog(LogCaptureAppender appender) {
- Pattern pattern = Pattern.compile("\\s+(?<millis>\\d+)+ms");
+ Pattern pattern = Pattern.compile("^Consumer has been disconnected
from the group coordinator for (?<millis>\\d+)+ms$");
List<Long> milliseconds = appender.getMessages().stream()
.map(pattern::matcher)
.filter(Matcher::find)