gharris1727 commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1357258143


##########
checkstyle/suppressions.xml:
##########
@@ -139,6 +139,8 @@
               files="Worker(SinkTask|SourceTask|Coordinator).java"/>
     <suppress checks="ParameterNumber"
               files="(ConfigKeyInfo|DistributedHerder).java"/>
+    <suppress checks="DefaultComesLast"
+              files="LoggingResource.java" />

Review Comment:
   nit: we could probably avoid this warning with enum parsing and a fallback 
enum.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -297,8 +297,8 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         // When automatic topic creation is disabled on the broker
         brokerProps.put("auto.create.topics.enable", "false");
         connect = connectBuilder
-            .brokerProps(brokerProps)
             .numWorkers(1)
+            .brokerProps(brokerProps)

Review Comment:
   nit: is this necessary?



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
             wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
                        err_msg="Failed to see connector startup in PAUSED 
state")
 
+    @cluster(num_nodes=5)
+    def test_dynamic_logging(self):
+        """
+        Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+        """
+
+        self.setup_services(num_workers=3)
+        self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        worker = self.cc.nodes[0]
+        prior_all_loggers = self.cc.get_all_loggers(worker)
+        self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+        assert prior_all_loggers is not None
+        assert 'root' in prior_all_loggers
+        # We need root and at least one other namespace (the other namespace 
is checked
+        # later on to make sure that it hasn't changed)
+        assert len(prior_all_loggers) >= 2
+        for logger in prior_all_loggers.values():
+            assert logger['last_modified'] is None
+
+        namespace = None
+        for logger in prior_all_loggers.keys():
+            if logger != 'root':
+                namespace = logger
+                break
+        assert namespace is not None
+
+        initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+        # Make sure we pick a different one than what's already set for that 
namespace
+        new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+        self.cc.set_logger(worker, namespace, 'ERROR')
+        request_time = int(time.time() * 1000)
+        affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+        assert len(affected_loggers) >= 1
+        for logger in affected_loggers:
+            assert logger.startswith(namespace)
+
+        assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+        assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+        # Force all loggers to get updated by setting the root namespace to
+        # two different levels
+        # This guarantees that their last-modified times will be updated
+        resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+        assert resp is None
+
+        new_root = 'INFO'
+        request_time = int(time.time() * 1000)
+        resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+        assert resp is None
+        wait_until(
+            lambda: self.loggers_set(new_root, request_time),
+            # This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+            timeout_sec=10,
+            err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+        )
+
+        new_level = 'DEBUG'
+        request_time = int(time.time() * 1000)
+        resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+        assert resp is None
+        wait_until(
+            lambda: self.loggers_set(new_level, request_time, namespace),
+            timeout_sec=10,
+            err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+        )
+
+        prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+        resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+        assert resp is None
+
+        prior_namespace = namespace
+        new_namespace = None
+        for logger, level in prior_all_loggers[0].items():
+            if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   In order for this to be true, there must be at least two non-overlapping 
namespaces not including root. With root included, does that mean that the 
earlier `assert len(prior_all_loggers) >= 2` should be 3 instead?



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
             wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
                        err_msg="Failed to see connector startup in PAUSED 
state")
 
+    @cluster(num_nodes=5)
+    def test_dynamic_logging(self):

Review Comment:
   This test is a bit repetitive and hard to read. Can you find some functions 
to pull out, or insert some "section" comments to visually separate the 
different stages?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##########
@@ -1232,6 +1191,16 @@ private AbstractHerder 
createConfigValidationHerder(Class<? extends Connector> c
         return herder;
     }
 
+    private AbstractHerder testHerder() {

Review Comment:
   Thanks for the cleanup!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java:
##########
@@ -94,122 +88,54 @@ public Response listLoggers() {
     public Response getLogger(final @PathParam("logger") String namedLogger) {
         Objects.requireNonNull(namedLogger, "require non-null name");
 
-        Logger logger = null;
-        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
-            logger = rootLogger();
-        } else {
-            Enumeration<Logger> en = currentLoggers();
-            // search within existing loggers for the given name.
-            // using LogManger.getLogger() will create a logger if it doesn't 
exist
-            // (potential leak since these don't get cleaned up).
-            while (en.hasMoreElements()) {
-                Logger l = en.nextElement();
-                if (namedLogger.equals(l.getName())) {
-                    logger = l;
-                    break;
-                }
-            }
-        }
-        if (logger == null) {
+        LoggerLevel loggerLevel = herder.loggerLevel(namedLogger);
+        if (loggerLevel == null)
             throw new NotFoundException("Logger " + namedLogger + " not 
found.");
-        } else {
-            return Response.ok(effectiveLevelToMap(logger)).build();
-        }
-    }
 
+        return Response.ok(loggerLevel).build();
+    }
 
     /**
      * Adjust level of a named logger. If the name corresponds to an ancestor, 
then the log level is applied to all child loggers.
      *
-     * @param namedLogger name of the logger
+     * @param namespace name of the logger
      * @param levelMap a map that is expected to contain one key 'level', and 
a value that is one of the log4j levels:
      *                 DEBUG, ERROR, FATAL, INFO, TRACE, WARN
      * @return names of loggers whose levels were modified
      */
     @PUT
     @Path("/{logger}")
     @Operation(summary = "Set the log level for the specified logger")
-    public Response setLevel(final @PathParam("logger") String namedLogger,
-                             final Map<String, String> levelMap) {
-        String desiredLevelStr = levelMap.get("level");
-        if (desiredLevelStr == null) {
-            throw new BadRequestException("Desired 'level' parameter was not 
specified in request.");
+    @SuppressWarnings("fallthrough")
+    public Response setLevel(final @PathParam("logger") String namespace,
+                             final Map<String, String> levelMap,
+                             @DefaultValue("worker") @QueryParam("scope") 
@Parameter(description = "The scope for the logging modification 
(single-worker, cluster-wide, etc.)") String scope) {
+        if (scope == null) {
+            log.warn("Received null scope in request to adjust logging level; 
will default to {}", WORKER_SCOPE);

Review Comment:
   nit: this should probably be debug or trace



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -81,7 +81,13 @@ def __init__(self, test_context):
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, 
timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, 
include_filestream_connectors=False):
+    def setup_services(self,

Review Comment:
   are you using this configurability anywhere?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.cli.ConnectStandalone;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;
+import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
+
+/**
+ * Start a standalone embedded connect worker. Internally, this class will 
spin up a Kafka and Zk cluster,
+ * setup any tmp directories and clean up them on them. Methods on the same
+ * {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe.
+ */
+public class EmbeddedConnectStandalone extends EmbeddedConnect {
+
+    private static final Logger log = 
LoggerFactory.getLogger(EmbeddedConnectStandalone.class);
+
+    private static final String REST_HOST_NAME = "localhost";
+
+    private final Map<String, String> workerProps;
+    private final String offsetsFile;
+
+    private WorkerHandle connectWorker;
+
+    private EmbeddedConnectStandalone(
+            int numBrokers,
+            Properties brokerProps,
+            boolean maskExitProcedures,
+            Map<String, String> clientProps,
+            Map<String, String> workerProps,
+            String offsetsFile
+    ) {
+        super(numBrokers, brokerProps, maskExitProcedures, clientProps);
+        this.workerProps = workerProps;
+        this.offsetsFile = offsetsFile;
+    }
+
+    @Override
+    public void startConnect() {
+        log.info("Starting standalone Connect worker");
+
+        workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers());
+        // use a random available port
+        workerProps.put(LISTENERS_CONFIG, "HTTP://" + REST_HOST_NAME + ":0");
+
+        workerProps.putIfAbsent(OFFSET_STORAGE_FILE_FILENAME_CONFIG, 
offsetsFile);
+        workerProps.putIfAbsent(KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        workerProps.putIfAbsent(VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        workerProps.putIfAbsent(PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");
+
+        Connect connect = new ConnectStandalone().startConnect(workerProps);
+        connectWorker = new WorkerHandle("standalone", connect);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("EmbeddedConnectStandalone(numBrokers= %d, 
workerProps= %s)",
+            numBrokers,
+            workerProps);
+    }
+
+    @Override
+    protected Set<WorkerHandle> workers() {
+        return connectWorker != null
+                ? Collections.singleton(connectWorker)
+                : Collections.emptySet();
+    }
+
+    public static class Builder extends 
EmbeddedConnectBuilder<EmbeddedConnectStandalone, Builder> {
+
+        private String offsetsFile = null;
+
+        public Builder offsetsFile(String offsetsFile) {
+            this.offsetsFile = offsetsFile;
+            return this;
+        }
+
+        @Override
+        protected EmbeddedConnectStandalone build(
+                int numBrokers,
+                Properties brokerProps,
+                boolean maskExitProcedures,
+                Map<String, String> clientProps,
+                Map<String, String> workerProps
+        ) {
+            if (offsetsFile == null)
+                offsetsFile = tempOffsetsFile();
+
+            return new EmbeddedConnectStandalone(
+                    numBrokers,
+                    brokerProps,
+                    maskExitProcedures,
+                    clientProps,
+                    workerProps,
+                    offsetsFile
+            );
+        }
+
+        private String tempOffsetsFile() {
+            try {
+                return TestUtils
+                        .tempFile("connect-standalone-offsets", "")

Review Comment:
   nit: you can specify null to get the default `.tmp` which prevents the 
filename randomness from becoming the file extension.
   ```suggestion
                           .tempFile("connect-standalone-offsets", null)
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Hierarchy;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class LoggersTest {
+
+    private static final long INITIAL_TIME = 1696951712135L;
+    private Time time;
+
+    @Before
+    public void setup() {
+        time = new MockTime(0, INITIAL_TIME, 0);
+    }
+
+    @Test
+    public void testGetLoggersIgnoresNullLevels() {
+        Logger root = logger("root");
+
+        Logger a = logger("a");
+        a.setLevel(null);
+        Logger b = logger("b");
+        b.setLevel(Level.INFO);
+
+        Loggers loggers = new TestLoggers(root, a, b);
+
+        Map<String, LoggerLevel> expectedLevels = Collections.singletonMap(
+                "b",
+                new LoggerLevel(Level.INFO.toString(), null)
+        );
+        Map<String, LoggerLevel> actualLevels = loggers.allLevels();
+        assertEquals(expectedLevels, actualLevels);
+    }
+
+    @Test
+    public void testGetLoggerFallsBackToEffectiveLogLevel() {
+        Logger root = logger("root");
+        root.setLevel(Level.ERROR);
+
+        Hierarchy hierarchy = new Hierarchy(root);
+        Logger a = hierarchy.getLogger("a");
+        a.setLevel(null);
+        Logger b = hierarchy.getLogger("b");
+        b.setLevel(Level.INFO);
+
+        Loggers loggers = new TestLoggers(root, a, b);
+
+        LoggerLevel expectedLevel = new LoggerLevel(Level.ERROR.toString(), 
null);
+        LoggerLevel actualLevel = loggers.level("a");
+        assertEquals(expectedLevel, actualLevel);
+    }
+
+    @Test
+    public void testGetUnknownLogger() {
+        Logger root = logger("root");
+        root.setLevel(Level.ERROR);
+
+        Hierarchy hierarchy = new Hierarchy(root);
+        Logger a = hierarchy.getLogger("a");
+        a.setLevel(null);
+        Logger b = hierarchy.getLogger("b");
+        b.setLevel(Level.INFO);
+
+        Loggers loggers = new TestLoggers(root, a, b);
+
+        LoggerLevel level = loggers.level("c");
+        assertNull(level);
+    }
+
+    @Test
+    public void testSetLevel() {
+        Logger root = logger("root");
+        root.setLevel(Level.ERROR);
+
+        Logger x = logger("a.b.c.p.X");
+        Logger y = logger("a.b.c.p.Y");
+        Logger z = logger("a.b.c.p.Z");
+        Logger w = logger("a.b.c.s.W");
+        x.setLevel(Level.INFO);
+        y.setLevel(Level.INFO);
+        z.setLevel(Level.INFO);
+        w.setLevel(Level.INFO);
+
+        // We don't explicitly register a logger for a.b.c.p, so it won't 
appear in the list of current loggers;
+        // one should be created by the Loggers instance when we set the level
+        TestLoggers loggers = new TestLoggers(root, x, y, z, w);
+
+        List<String> modified = loggers.setLevel("a.b.c.p", Level.DEBUG);
+        assertEquals(Arrays.asList("a.b.c.p", "a.b.c.p.X", "a.b.c.p.Y", 
"a.b.c.p.Z"), modified);
+        assertEquals(Level.DEBUG.toString(), loggers.level("a.b.c.p").level());
+        assertEquals(Level.DEBUG, x.getLevel());
+        assertEquals(Level.DEBUG, y.getLevel());
+        assertEquals(Level.DEBUG, z.getLevel());
+
+        LoggerLevel expectedLevel = new LoggerLevel(Level.DEBUG.toString(), 
INITIAL_TIME);
+        LoggerLevel actualLevel = loggers.level("a.b.c.p");
+        assertEquals(expectedLevel, actualLevel);
+
+        // Sleep a little and adjust the level of a leaf logger
+        time.sleep(10);
+        loggers.setLevel("a.b.c.p.X", Level.ERROR);
+        expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 
10);
+        actualLevel = loggers.level("a.b.c.p.X");
+        assertEquals(expectedLevel, actualLevel);
+
+        // Make sure that the direct parent logger and a sibling logger remain 
unaffected
+        expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME);
+        actualLevel = loggers.level("a.b.c.p");
+        assertEquals(expectedLevel, actualLevel);
+
+        expectedLevel = new LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME);
+        actualLevel = loggers.level("a.b.c.p.Y");
+        assertEquals(expectedLevel, actualLevel);
+
+        // Set the same level again, and verify that the last modified time 
hasn't been altered
+        time.sleep(10);
+        loggers.setLevel("a.b.c.p.X", Level.ERROR);
+        expectedLevel = new LoggerLevel(Level.ERROR.toString(), INITIAL_TIME + 
10);
+        actualLevel = loggers.level("a.b.c.p.X");
+        assertEquals(expectedLevel, actualLevel);
+    }
+
+    @Test
+    public void testSetRootLevel() {
+        Logger root = logger("root");
+        root.setLevel(Level.ERROR);
+
+        Logger p = logger("a.b.c.p");
+        Logger x = logger("a.b.c.p.X");
+        Logger y = logger("a.b.c.p.Y");
+        Logger z = logger("a.b.c.p.Z");
+        Logger w = logger("a.b.c.s.W");
+        x.setLevel(Level.INFO);
+        y.setLevel(Level.INFO);
+        z.setLevel(Level.INFO);
+        w.setLevel(Level.INFO);
+
+        Loggers loggers = new TestLoggers(root, x, y, z, w);
+
+        List<String> modified = loggers.setLevel("root", Level.DEBUG);
+        assertEquals(Arrays.asList("a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z", 
"a.b.c.s.W", "root"), modified);
+
+        assertNull(p.getLevel());
+
+        assertEquals(root.getLevel(), Level.DEBUG);
+
+        assertEquals(w.getLevel(), Level.DEBUG);
+        assertEquals(x.getLevel(), Level.DEBUG);
+        assertEquals(y.getLevel(), Level.DEBUG);
+        assertEquals(z.getLevel(), Level.DEBUG);
+
+        Map<String, LoggerLevel> expectedLevels = new HashMap<>();
+        expectedLevels.put("root", new LoggerLevel(Level.DEBUG.toString(), 
INITIAL_TIME));
+        expectedLevels.put("a.b.c.p.X", new 
LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+        expectedLevels.put("a.b.c.p.Y", new 
LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+        expectedLevels.put("a.b.c.p.Z", new 
LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+        expectedLevels.put("a.b.c.s.W", new 
LoggerLevel(Level.DEBUG.toString(), INITIAL_TIME));
+
+        Map<String, LoggerLevel> actualLevels = loggers.allLevels();
+        assertEquals(expectedLevels, actualLevels);
+    }
+
+    private class TestLoggers extends Loggers {

Review Comment:
   I like this mechanism for mocking out the LogManager calls, very nice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to