This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9c2e5daf606 MINOR: Revert log level changes in LogCaptureAppender
(#14436)
9c2e5daf606 is described below
commit 9c2e5daf606aebb343224f943b8e6bac4351c193
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 26 10:49:41 2023 +0200
MINOR: Revert log level changes in LogCaptureAppender (#14436)
LogCaptureAppender sets the log level in various tests to check if a
certain log message is produced. The log level is however never reverted,
changing the log level across the board and introducing flakiness due to
non-determinism since the log level depends on execution order. Some log
messages change the timing inside tests significantly.
Reviewer: Bruno Cadonna <[email protected]>
---
.../kafka/common/utils/LogCaptureAppender.java | 24 ++++++++++++++++++++--
.../connect/mirror/MirrorSourceConnectorTest.java | 2 +-
.../runtime/SourceTaskOffsetCommitterTest.java | 2 +-
.../connect/runtime/WorkerSourceTaskTest.java | 4 ++--
.../apache/kafka/streams/StreamsConfigTest.java | 6 +++---
.../internals/InternalTopicManagerTest.java | 2 +-
.../processor/internals/PartitionGroupTest.java | 14 ++++++-------
.../internals/StoreChangelogReaderTest.java | 2 +-
.../processor/internals/TaskManagerTest.java | 2 +-
9 files changed, 39 insertions(+), 19 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
index 0d569af30a5..4f035840bd2 100644
---
a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
+++
b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
@@ -27,6 +27,20 @@ import java.util.Optional;
public class LogCaptureAppender extends AppenderSkeleton implements
AutoCloseable {
private final List<LoggingEvent> events = new LinkedList<>();
+ private final List<LogLevelChange> logLevelChanges = new LinkedList<>();
+
+ public static class LogLevelChange {
+
+ public LogLevelChange(final Level originalLevel, final Class<?> clazz)
{
+ this.originalLevel = originalLevel;
+ this.clazz = clazz;
+ }
+
+ private final Level originalLevel;
+
+ private final Class<?> clazz;
+
+ }
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public static class Event {
@@ -65,11 +79,13 @@ public class LogCaptureAppender extends AppenderSkeleton
implements AutoCloseabl
return logCaptureAppender;
}
- public static void setClassLoggerToDebug(final Class<?> clazz) {
+ public void setClassLoggerToDebug(final Class<?> clazz) {
+ logLevelChanges.add(new
LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz));
Logger.getLogger(clazz).setLevel(Level.DEBUG);
}
- public static void setClassLoggerToTrace(final Class<?> clazz) {
+ public void setClassLoggerToTrace(final Class<?> clazz) {
+ logLevelChanges.add(new
LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz));
Logger.getLogger(clazz).setLevel(Level.TRACE);
}
@@ -120,6 +136,10 @@ public class LogCaptureAppender extends AppenderSkeleton
implements AutoCloseabl
@Override
public void close() {
+ for (final LogLevelChange logLevelChange : logLevelChanges) {
+
Logger.getLogger(logLevelChange.clazz).setLevel(logLevelChange.originalLevel);
+ }
+ logLevelChanges.clear();
unregister(this);
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index bc56d3ee8ea..04cd08000f9 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -197,7 +197,7 @@ public class MirrorSourceConnectorTest {
when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult);
try (LogCaptureAppender connectorLogs =
LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) {
-
LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class);
+ connectorLogs.setClassLoggerToTrace(MirrorSourceConnector.class);
connector.syncTopicAcls();
long aclSyncDisableMessages = connectorLogs.getMessages().stream()
.filter(m -> m.contains("Consider disabling topic ACL
syncing"))
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 6c3a85531cd..ef962cf5b44 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -142,7 +142,7 @@ public class SourceTaskOffsetCommitterTest {
committers.put(taskId, taskFuture);
try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
-
LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
+
logCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
committer.remove(taskId);
assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("TRACE")));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 719dc10d5b1..dd347faca89 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -990,10 +990,10 @@ public class WorkerSourceTaskTest {
private void assertShouldSkipCommit() {
assertFalse(workerTask.shouldCommitOffsets());
-
LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
- LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class);
try (LogCaptureAppender committerAppender =
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class);
LogCaptureAppender taskAppender =
LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) {
+
committerAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
+ taskAppender.setClassLoggerToTrace(WorkerSourceTask.class);
SourceTaskOffsetCommitter.commit(workerTask);
assertEquals(Collections.emptyList(), taskAppender.getMessages());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 4223294c68a..35d7b3e797b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1065,8 +1065,8 @@ public class StreamsConfigTest {
public void shouldLogWarningWhenEosAlphaIsUsed() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE);
- LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLoggerToDebug(StreamsConfig.class);
new StreamsConfig(props);
assertThat(
@@ -1085,8 +1085,8 @@ public class StreamsConfigTest {
public void shouldLogWarningWhenEosBetaIsUsed() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_BETA);
- LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLoggerToDebug(StreamsConfig.class);
new StreamsConfig(props);
assertThat(
@@ -1103,8 +1103,8 @@ public class StreamsConfigTest {
public void shouldLogWarningWhenRetriesIsUsed() {
props.put(StreamsConfig.RETRIES_CONFIG, 0);
- LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLoggerToDebug(StreamsConfig.class);
new StreamsConfig(props);
assertThat(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index a81977d620a..22ab4971a4a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -880,8 +880,8 @@ public class InternalTopicManagerTest {
topicConfigMap.put(topic1, internalTopicConfig);
topicConfigMap.put("internal-topic", internalTopicConfigII);
- LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
+ appender.setClassLoggerToDebug(InternalTopicManager.class);
internalTopicManager.makeReady(topicConfigMap);
assertThat(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 599068c0f3c..87ac1b762f2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -564,7 +564,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(true));
assertThat(
appender.getEvents(),
@@ -607,7 +607,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(true));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(true));
assertThat(
appender.getEvents(),
@@ -641,7 +641,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(false));
assertThat(
appender.getEvents(),
@@ -680,7 +680,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(false));
assertThat(
appender.getEvents(),
@@ -714,7 +714,7 @@ public class PartitionGroupTest {
assertThat(group.allPartitionsBufferedLocally(), is(false));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(0L), is(false));
assertThat(
appender.getEvents(),
@@ -726,7 +726,7 @@ public class PartitionGroupTest {
}
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(1L), is(true));
assertThat(
appender.getEvents(),
@@ -745,7 +745,7 @@ public class PartitionGroupTest {
}
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
- LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+ appender.setClassLoggerToTrace(PartitionGroup.class);
assertThat(group.readyToProcess(2L), is(true));
assertThat(
appender.getEvents(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c764d6ede2a..e544f485f46 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -1303,8 +1303,8 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
@Test
public void shouldNotThrowOnUnknownRevokedPartition() {
- LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) {
+ appender.setClassLoggerToDebug(StoreChangelogReader.class);
changelogReader.unregister(Collections.singletonList(new
TopicPartition("unknown", 0)));
assertThat(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 235e3c50c8f..5ac9198ac84 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4155,7 +4155,7 @@ public class TaskManagerTest {
replay(consumer);
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TaskManager.class)) {
- LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
+ appender.setClassLoggerToDebug(TaskManager.class);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null),
is(true));
assertThat(task00.state(), is(Task.State.RUNNING));