This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c2e5daf606 MINOR: Revert log level changes in LogCaptureAppender 
(#14436)
9c2e5daf606 is described below

commit 9c2e5daf606aebb343224f943b8e6bac4351c193
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Sep 26 10:49:41 2023 +0200

    MINOR: Revert log level changes in LogCaptureAppender (#14436)
    
    LogCaptureAppender sets the log level in various tests to check if a 
certain log message is produced. The log level is however never reverted, 
changing the log level across the board and introducing flakiness due to 
non-determinism since the log level depends on execution order. Some log 
messages change the timing inside tests significantly.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../kafka/common/utils/LogCaptureAppender.java     | 24 ++++++++++++++++++++--
 .../connect/mirror/MirrorSourceConnectorTest.java  |  2 +-
 .../runtime/SourceTaskOffsetCommitterTest.java     |  2 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |  4 ++--
 .../apache/kafka/streams/StreamsConfigTest.java    |  6 +++---
 .../internals/InternalTopicManagerTest.java        |  2 +-
 .../processor/internals/PartitionGroupTest.java    | 14 ++++++-------
 .../internals/StoreChangelogReaderTest.java        |  2 +-
 .../processor/internals/TaskManagerTest.java       |  2 +-
 9 files changed, 39 insertions(+), 19 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java 
b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
index 0d569af30a5..4f035840bd2 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java
@@ -27,6 +27,20 @@ import java.util.Optional;
 
 public class LogCaptureAppender extends AppenderSkeleton implements 
AutoCloseable {
     private final List<LoggingEvent> events = new LinkedList<>();
+    private final List<LogLevelChange> logLevelChanges = new LinkedList<>();
+
+    public static class LogLevelChange {
+
+        public LogLevelChange(final Level originalLevel, final Class<?> clazz) 
{
+            this.originalLevel = originalLevel;
+            this.clazz = clazz;
+        }
+
+        private final Level originalLevel;
+
+        private final Class<?> clazz;
+
+    }
 
     @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
     public static class Event {
@@ -65,11 +79,13 @@ public class LogCaptureAppender extends AppenderSkeleton 
implements AutoCloseabl
         return logCaptureAppender;
     }
 
-    public static void setClassLoggerToDebug(final Class<?> clazz) {
+    public void setClassLoggerToDebug(final Class<?> clazz) {
+        logLevelChanges.add(new 
LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz));
         Logger.getLogger(clazz).setLevel(Level.DEBUG);
     }
 
-    public static void setClassLoggerToTrace(final Class<?> clazz) {
+    public void setClassLoggerToTrace(final Class<?> clazz) {
+        logLevelChanges.add(new 
LogLevelChange(Logger.getLogger(clazz).getLevel(), clazz));
         Logger.getLogger(clazz).setLevel(Level.TRACE);
     }
 
@@ -120,6 +136,10 @@ public class LogCaptureAppender extends AppenderSkeleton 
implements AutoCloseabl
 
     @Override
     public void close() {
+        for (final LogLevelChange logLevelChange : logLevelChanges) {
+            
Logger.getLogger(logLevelChange.clazz).setLevel(logLevelChange.originalLevel);
+        }
+        logLevelChanges.clear();
         unregister(this);
     }
 
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index bc56d3ee8ea..04cd08000f9 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -197,7 +197,7 @@ public class MirrorSourceConnectorTest {
         when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult);
 
         try (LogCaptureAppender connectorLogs = 
LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) {
-            
LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class);
+            connectorLogs.setClassLoggerToTrace(MirrorSourceConnector.class);
             connector.syncTopicAcls();
             long aclSyncDisableMessages = connectorLogs.getMessages().stream()
                     .filter(m -> m.contains("Consider disabling topic ACL 
syncing"))
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 6c3a85531cd..ef962cf5b44 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -142,7 +142,7 @@ public class SourceTaskOffsetCommitterTest {
 
         committers.put(taskId, taskFuture);
         try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
-            
LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
+            
logCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
             committer.remove(taskId);
             assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("TRACE")));
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 719dc10d5b1..dd347faca89 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -990,10 +990,10 @@ public class WorkerSourceTaskTest {
     private void assertShouldSkipCommit() {
         assertFalse(workerTask.shouldCommitOffsets());
 
-        
LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
-        LogCaptureAppender.setClassLoggerToTrace(WorkerSourceTask.class);
         try (LogCaptureAppender committerAppender = 
LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class);
              LogCaptureAppender taskAppender = 
LogCaptureAppender.createAndRegister(WorkerSourceTask.class)) {
+            
committerAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);
+            taskAppender.setClassLoggerToTrace(WorkerSourceTask.class);
             SourceTaskOffsetCommitter.commit(workerTask);
             assertEquals(Collections.emptyList(), taskAppender.getMessages());
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 4223294c68a..35d7b3e797b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1065,8 +1065,8 @@ public class StreamsConfigTest {
     public void shouldLogWarningWhenEosAlphaIsUsed() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
 
-        LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLoggerToDebug(StreamsConfig.class);
             new StreamsConfig(props);
 
             assertThat(
@@ -1085,8 +1085,8 @@ public class StreamsConfigTest {
     public void shouldLogWarningWhenEosBetaIsUsed() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_BETA);
 
-        LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLoggerToDebug(StreamsConfig.class);
             new StreamsConfig(props);
 
             assertThat(
@@ -1103,8 +1103,8 @@ public class StreamsConfigTest {
     public void shouldLogWarningWhenRetriesIsUsed() {
         props.put(StreamsConfig.RETRIES_CONFIG, 0);
 
-        LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLoggerToDebug(StreamsConfig.class);
             new StreamsConfig(props);
 
             assertThat(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index a81977d620a..22ab4971a4a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -880,8 +880,8 @@ public class InternalTopicManagerTest {
         topicConfigMap.put(topic1, internalTopicConfig);
         topicConfigMap.put("internal-topic", internalTopicConfigII);
 
-        LogCaptureAppender.setClassLoggerToDebug(InternalTopicManager.class);
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(InternalTopicManager.class)) {
+            appender.setClassLoggerToDebug(InternalTopicManager.class);
             internalTopicManager.makeReady(topicConfigMap);
 
             assertThat(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 599068c0f3c..87ac1b762f2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -564,7 +564,7 @@ public class PartitionGroupTest {
 
         assertThat(group.allPartitionsBufferedLocally(), is(false));
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(0L), is(true));
             assertThat(
                 appender.getEvents(),
@@ -607,7 +607,7 @@ public class PartitionGroupTest {
 
         assertThat(group.allPartitionsBufferedLocally(), is(true));
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(0L), is(true));
             assertThat(
                 appender.getEvents(),
@@ -641,7 +641,7 @@ public class PartitionGroupTest {
 
         assertThat(group.allPartitionsBufferedLocally(), is(false));
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(0L), is(false));
             assertThat(
                 appender.getEvents(),
@@ -680,7 +680,7 @@ public class PartitionGroupTest {
         assertThat(group.allPartitionsBufferedLocally(), is(false));
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(0L), is(false));
             assertThat(
                 appender.getEvents(),
@@ -714,7 +714,7 @@ public class PartitionGroupTest {
         assertThat(group.allPartitionsBufferedLocally(), is(false));
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(0L), is(false));
             assertThat(
                 appender.getEvents(),
@@ -726,7 +726,7 @@ public class PartitionGroupTest {
         }
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(1L), is(true));
             assertThat(
                 appender.getEvents(),
@@ -745,7 +745,7 @@ public class PartitionGroupTest {
         }
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(PartitionGroup.class)) {
-            LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class);
+            appender.setClassLoggerToTrace(PartitionGroup.class);
             assertThat(group.readyToProcess(2L), is(true));
             assertThat(
                 appender.getEvents(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index c764d6ede2a..e544f485f46 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -1303,8 +1303,8 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
 
     @Test
     public void shouldNotThrowOnUnknownRevokedPartition() {
-        LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class);
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StoreChangelogReader.class)) {
+            appender.setClassLoggerToDebug(StoreChangelogReader.class);
             changelogReader.unregister(Collections.singletonList(new 
TopicPartition("unknown", 0)));
 
             assertThat(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 235e3c50c8f..5ac9198ac84 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4155,7 +4155,7 @@ public class TaskManagerTest {
         replay(consumer);
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TaskManager.class)) {
-            LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
+            appender.setClassLoggerToDebug(TaskManager.class);
             taskManager.handleAssignment(taskId00Assignment, emptyMap());
             
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), 
is(true));
             assertThat(task00.state(), is(Task.State.RUNNING));

Reply via email to