Repository: storm Updated Branches: refs/heads/master 56bc60374 -> dece08fbd
STORM-1267: port backtype.storm.command.set-log-level to java Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3425e7d1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3425e7d1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3425e7d1 Branch: refs/heads/master Commit: 3425e7d12f514c976fa793f31dcbeafa1527bab4 Parents: 4ca7522 Author: Abhishek Agarwal <[email protected]> Authored: Mon Feb 22 16:37:57 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Mon Feb 22 16:37:57 2016 +0530 ---------------------------------------------------------------------- .../org/apache/storm/command/set_log_level.clj | 76 ------------ .../src/jvm/org/apache/storm/command/CLI.java | 25 +++- .../org/apache/storm/command/SetLogLevel.java | 116 +++++++++++++++++++ .../src/jvm/org/apache/storm/utils/Utils.java | 61 +++++++--- .../apache/storm/command/SetLogLevelTest.java | 54 +++++++++ .../jvm/org/apache/storm/command/TestCLI.java | 62 ++++++---- 6 files changed, 272 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/clj/org/apache/storm/command/set_log_level.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj b/storm-core/src/clj/org/apache/storm/command/set_log_level.clj deleted file mode 100644 index 6048246..0000000 --- a/storm-core/src/clj/org/apache/storm/command/set_log_level.clj +++ /dev/null @@ -1,76 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.command.set-log-level - (:use [clojure.tools.cli :only [cli]]) - (:use [org.apache.storm log]) - (:use [org.apache.storm.internal thrift]) - (:import [org.apache.logging.log4j Level]) - (:import [org.apache.storm.generated LogConfig LogLevel LogLevelAction]) - (:gen-class)) - -(defn- get-storm-id - "Get topology id for a running topology from the topology name." - [nimbus name] - (let [info (.getClusterInfo nimbus) - topologies (.get_topologies info) - topology (first (filter (fn [topo] (= name (.get_name topo))) topologies))] - (if topology - (.get_id topology) - (throw (.IllegalArgumentException (str name " is not a running topology")))))) - -(defn- parse-named-log-levels [action] - "Parses [logger name]=[level string]:[optional timeout],[logger name2]... - - e.g. ROOT=DEBUG:30 - root logger, debug for 30 seconds - - org.apache.foo=WARN - org.apache.foo set to WARN indefinitely" - (fn [^String s] - (let [log-args (re-find #"(.*)=([A-Z]+):?(\d*)" s) - name (if (= action LogLevelAction/REMOVE) s (nth log-args 1)) - level (Level/toLevel (nth log-args 2)) - timeout-str (nth log-args 3) - log-level (LogLevel.)] - (if (= action LogLevelAction/REMOVE) - (.set_action log-level action) - (do - (.set_action log-level action) - (.set_target_log_level log-level (.toString level)) - (.set_reset_log_level_timeout_secs log-level - (Integer. (if (= timeout-str "") "0" timeout-str))))) - {name log-level}))) - -(defn- merge-together [previous key val] - (assoc previous key - (if-let [oldval (get previous key)] - (merge oldval val) - val))) - -(defn -main [& args] - (let [[{log-setting :log-setting remove-log-setting :remove-log-setting} [name] _] - (cli args ["-l" "--log-setting" - :parse-fn (parse-named-log-levels LogLevelAction/UPDATE) - :assoc-fn merge-together] - ["-r" "--remove-log-setting" - :parse-fn (parse-named-log-levels LogLevelAction/REMOVE) - :assoc-fn merge-together]) - log-config (LogConfig.)] - (doseq [[log-name log-val] (merge log-setting remove-log-setting)] - (.put_to_named_logger_level log-config log-name log-val)) - (log-message "Sent log config " log-config " for topology " name) - (with-configured-nimbus-connection nimbus - (.setLogConfig nimbus (get-storm-id nimbus name) log-config)))) http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/command/CLI.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/CLI.java b/storm-core/src/jvm/org/apache/storm/command/CLI.java index d4eaa5d..f29debc 100644 --- a/storm-core/src/jvm/org/apache/storm/command/CLI.java +++ b/storm-core/src/jvm/org/apache/storm/command/CLI.java @@ -17,19 +17,18 @@ */ package org.apache.storm.command; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.List; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class CLI { private static final Logger LOG = LoggerFactory.getLogger(CLI.class); private static class Opt { @@ -139,6 +138,20 @@ public class CLI { } }; + /** + * All values are returned as a map + */ + public static final Assoc INTO_MAP = new Assoc() { + @Override + public Object assoc(Object current, Object value) { + if (null == current) { + current = new HashMap<Object, Object>(); + } + ((Map<Object, Object>) current).putAll((Map<Object, Object>) value); + return current; + } + }; + public static class CLIBuilder { private final ArrayList<Opt> opts = new ArrayList<>(); private final ArrayList<Arg> args = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java new file mode 100644 index 0000000..30cea5f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/command/SetLogLevel.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.command; + +import com.google.common.base.Preconditions; + +import org.apache.logging.log4j.Level; +import org.apache.storm.generated.LogConfig; +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +public class SetLogLevel { + + private static final Logger LOG = LoggerFactory.getLogger(SetLogLevel.class); + + public static void main(String[] args) throws Exception { + Map<String, Object> cl = CLI.opt("l", "log-setting", null, new LogLevelsParser(LogLevelAction.UPDATE), CLI.INTO_MAP) + .opt("r", "remove-log-setting", null, new LogLevelsParser(LogLevelAction.REMOVE), CLI.INTO_MAP) + .arg("topologyName", CLI.FIRST_WINS) + .parse(args); + final String topologyName = (String) cl.get("topologyName"); + final LogConfig logConfig = new LogConfig(); + Map<String, LogLevel> logLevelMap = new HashMap<>(); + Map<String, LogLevel> updateLogLevel = (Map<String, LogLevel>) cl.get("l"); + if (null != updateLogLevel) { + logLevelMap.putAll(updateLogLevel); + } + Map<String, LogLevel> removeLogLevel = (Map<String, LogLevel>) cl.get("r"); + if (null != removeLogLevel) { + logLevelMap.putAll(removeLogLevel); + } + + for (Map.Entry<String, LogLevel> entry : logLevelMap.entrySet()) { + logConfig.put_to_named_logger_level(entry.getKey(), entry.getValue()); + } + + NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() { + @Override + public void run(Nimbus.Client nimbus) throws Exception { + String topologyId = Utils.getTopologyId(topologyName, nimbus); + if (null == topologyId) { + throw new IllegalArgumentException(topologyName + " is not a running topology"); + } + nimbus.setLogConfig(topologyId, logConfig); + LOG.info("Log config {} is sent for topology {}", logConfig, topologyName); + } + }); + } + + /** + * Parses [logger name]=[level string]:[optional timeout],[logger name2]... + * + * e.g. ROOT=DEBUG:30 + * root logger, debug for 30 seconds + * + * org.apache.foo=WARN + * org.apache.foo set to WARN indefinitely + */ + static final class LogLevelsParser implements CLI.Parse { + + private LogLevelAction action; + + public LogLevelsParser(LogLevelAction action) { + this.action = action; + } + + @Override + public Object parse(String value) { + final LogLevel logLevel = new LogLevel(); + logLevel.set_action(action); + String name = null; + if (action == LogLevelAction.REMOVE) { + name = value; + } else { + String[] splits = value.split("="); + Preconditions.checkArgument(splits.length == 2, "Invalid log string '%s'", value); + name = splits[0]; + splits = splits[1].split(":"); + Integer timeout = 0; + Level level = Level.valueOf(splits[0]); + logLevel.set_reset_log_level(level.toString()); + if (splits.length > 1) { + timeout = Integer.parseInt(splits[1]); + } + logLevel.set_reset_log_level_timeout_secs(timeout); + } + Map<String, LogLevel> result = new HashMap<>(); + result.put(name, logLevel); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index b62f99c..fe0c431 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -17,11 +17,22 @@ */ package org.apache.storm.utils; +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.input.ClassLoaderObjectInputStream; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.storm.Config; import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.BlobStoreAclHandler; @@ -29,22 +40,24 @@ import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.blobstore.LocalFsBlobStore; import org.apache.storm.daemon.JarTransformer; -import org.apache.storm.generated.*; +import org.apache.storm.generated.AccessControl; +import org.apache.storm.generated.AccessControlType; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.ComponentObject; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.Nimbus; +import org.apache.storm.generated.ReadableBlobMeta; +import org.apache.storm.generated.SettableBlobMeta; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; import org.apache.storm.localizer.Localizer; import org.apache.storm.nimbus.NimbusInfo; import org.apache.storm.serialization.DefaultSerializationDelegate; import org.apache.storm.serialization.SerializationDelegate; -import clojure.lang.RT; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.io.input.ClassLoaderObjectInputStream; -import org.apache.commons.lang.StringUtils; -import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; -import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; -import org.apache.curator.ensemble.exhibitor.Exhibitors; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -118,6 +131,8 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import clojure.lang.RT; + public class Utils { // A singleton instance allows us to mock delegated static methods in our // tests by subclassing. @@ -1430,21 +1445,29 @@ public class Utils { } public static TopologyInfo getTopologyInfo(String name, String asUser, Map stormConf) { - NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser); - TopologyInfo topologyInfo = null; + try (NimbusClient client = NimbusClient.getConfiguredClientAs(stormConf, asUser)) { + String topologyId = getTopologyId(name, client.getClient()); + if (null != topologyId) { + return client.getClient().getTopologyInfo(topologyId); + } + return null; + } catch(Exception e) { + throw new RuntimeException(e); + } + } + + public static String getTopologyId(String name, Nimbus.Client client) { try { - ClusterSummary summary = client.getClient().getClusterInfo(); + ClusterSummary summary = client.getClusterInfo(); for(TopologySummary s : summary.get_topologies()) { if(s.get_name().equals(name)) { - topologyInfo = client.getClient().getTopologyInfo(s.get_id()); + return s.get_id(); } } } catch(Exception e) { throw new RuntimeException(e); - } finally { - client.close(); } - return topologyInfo; + return null; } /** http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java b/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java new file mode 100644 index 0000000..4582371 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/command/SetLogLevelTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.command; + +import org.apache.storm.generated.LogLevel; +import org.apache.storm.generated.LogLevelAction; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class SetLogLevelTest { + + @Test + public void testUpdateLogLevelParser() { + SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE); + LogLevel logLevel = ((Map<String, LogLevel>) logLevelsParser.parse("com.foo.one=warn")).get("com.foo.one"); + Assert.assertEquals(0, logLevel.get_reset_log_level_timeout_secs()); + Assert.assertEquals("WARN", logLevel.get_reset_log_level()); + + logLevel = ((Map<String, LogLevel>) logLevelsParser.parse("com.foo.two=DEBUG:10")).get("com.foo.two"); + Assert.assertEquals(10, logLevel.get_reset_log_level_timeout_secs()); + Assert.assertEquals("DEBUG", logLevel.get_reset_log_level()); + } + + @Test(expected = NumberFormatException.class) + public void testInvalidTimeout() { + SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE); + logLevelsParser.parse("com.foo.bar=warn:NaN"); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidLogLevel() { + SetLogLevel.LogLevelsParser logLevelsParser = new SetLogLevel.LogLevelsParser(LogLevelAction.UPDATE); + logLevelsParser.parse("com.foo.bar=CRITICAL"); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/3425e7d1/storm-core/test/jvm/org/apache/storm/command/TestCLI.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java index b647458..c9a4b79 100644 --- a/storm-core/test/jvm/org/apache/storm/command/TestCLI.java +++ b/storm-core/test/jvm/org/apache/storm/command/TestCLI.java @@ -18,42 +18,62 @@ package org.apache.storm.command; -import java.util.Map; +import org.junit.Test; + +import java.util.HashMap; import java.util.List; -import java.util.Arrays; +import java.util.Map; -import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class TestCLI { + @Test public void testSimple() throws Exception { Map<String, Object> values = CLI.opt("a", "aa", null) - .opt("b", "bb", 1, CLI.AS_INT) - .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS) - .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST) - .arg("A") - .arg("B", CLI.AS_INT) - .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3"); - assertEquals(6, values.size()); - assertEquals("200", (String)values.get("a")); - assertEquals((Integer)40, (Integer)values.get("b")); - assertEquals((Integer)2, (Integer)values.get("c")); - - List<String> d = (List<String>)values.get("d"); + .opt("b", "bb", 1, CLI.AS_INT) + .opt("c", "cc", 1, CLI.AS_INT, CLI.FIRST_WINS) + .opt("d", "dd", null, CLI.AS_STRING, CLI.INTO_LIST) + .opt("e", "ee", null, new PairParse(), CLI.INTO_MAP) + .arg("A") + .arg("B", CLI.AS_INT) + .parse("-a100", "--aa", "200", "-c2", "-b", "50", "--cc", "100", "A-VALUE", "1", "2", "3", "-b40", "-d1", "-d2", "-d3" + , "-e", "key1=value1", "-e", "key2=value2"); + assertEquals(7, values.size()); + assertEquals("200", (String) values.get("a")); + assertEquals((Integer) 40, (Integer) values.get("b")); + assertEquals((Integer) 2, (Integer) values.get("c")); + + List<String> d = (List<String>) values.get("d"); assertEquals(3, d.size()); assertEquals("1", d.get(0)); assertEquals("2", d.get(1)); assertEquals("3", d.get(2)); - List<String> A = (List<String>)values.get("A"); + List<String> A = (List<String>) values.get("A"); assertEquals(1, A.size()); assertEquals("A-VALUE", A.get(0)); - List<Integer> B = (List<Integer>)values.get("B"); + List<Integer> B = (List<Integer>) values.get("B"); assertEquals(3, B.size()); - assertEquals((Integer)1, B.get(0)); - assertEquals((Integer)2, B.get(1)); - assertEquals((Integer)3, B.get(2)); + assertEquals((Integer) 1, B.get(0)); + assertEquals((Integer) 2, B.get(1)); + assertEquals((Integer) 3, B.get(2)); + + Map<String, String> e = (Map<String, String>) values.get("e"); + assertEquals(2, e.size()); + assertEquals("value1", e.get("key1")); + assertEquals("value2", e.get("key2")); + } + + private static final class PairParse implements CLI.Parse { + + @Override + public Object parse(String value) { + Map<String, String> result = new HashMap<>(); + String[] splits = value.split("="); + result.put(splits[0], splits[1]); + return result; + } } }
