Repository: storm Updated Branches: refs/heads/1.1.x-branch f7b8bb259 -> 9b6d15735
STORM-2862: Move multilang logging to a shared class and make this class configurable. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc65d3bb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc65d3bb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc65d3bb Branch: refs/heads/1.1.x-branch Commit: dc65d3bb9dceab31191ab49e365bd25b17f46377 Parents: f7b8bb2 Author: Heather McCartney <[email protected]> Authored: Wed Dec 20 10:14:53 2017 +0000 Committer: Heather McCartney <[email protected]> Committed: Mon Jan 22 09:20:45 2018 +0000 ---------------------------------------------------------------------- storm-core/src/jvm/org/apache/storm/Config.java | 8 ++ .../jvm/org/apache/storm/spout/ShellSpout.java | 37 ++---- .../jvm/org/apache/storm/task/ShellBolt.java | 38 ++----- .../storm/utils/DefaultShellLogHandler.java | 113 +++++++++++++++++++ .../org/apache/storm/utils/ShellLogHandler.java | 52 +++++++++ .../jvm/org/apache/storm/utils/ShellUtils.java | 17 +++ .../storm/utils/DefaultShellLogHandlerTest.java | 105 +++++++++++++++++ .../org/apache/storm/utils/ShellUtilsTest.java | 103 +++++++++++++++++ 8 files changed, 413 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 4f3bd1c..ba543be 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1604,6 +1604,14 @@ public class Config extends HashMap<String, Object> { public static final String TOPOLOGY_DEBUG = "topology.debug"; /** + * The fully qualified name of a {@link ShellLogHandler} to handle output + * from non-JVM processes e.g. "com.mycompany.CustomShellLogHandler". If + * not provided, org.apache.storm.utils.DefaultLogHandler will be used. + */ + @isString + public static final String TOPOLOGY_MULTILANG_LOG_HANDLER = "topology.multilang.log.handler"; + + /** * The serializer for communication between shell components and non-JVM * processes */ http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java index a5ec72b..afd816b 100644 --- a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java @@ -24,7 +24,9 @@ import org.apache.storm.metric.api.rpc.IShellMetric; import org.apache.storm.multilang.ShellMsg; import org.apache.storm.multilang.SpoutMsg; import org.apache.storm.task.TopologyContext; +import org.apache.storm.utils.ShellLogHandler; import org.apache.storm.utils.ShellProcess; +import org.apache.storm.utils.ShellUtils; import java.util.Arrays; import java.util.Map; @@ -50,6 +52,7 @@ public class ShellSpout implements ISpout { private SpoutOutputCollector _collector; private String[] _command; private Map<String, String> env = new HashMap<>(); + private ShellLogHandler _logHandler; private ShellProcess _process; private volatile boolean _running = true; private volatile RuntimeException _exception; @@ -111,6 +114,9 @@ public class ShellSpout implements ISpout { Number subpid = _process.launch(stormConf, context, changeDirectory); LOG.info("Launched subprocess with pid " + subpid); + _logHandler = ShellUtils.getLogHandler(stormConf); + _logHandler.setUpContext(ShellSpout.class, _process, _context); + heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1)); } @@ -145,7 +151,6 @@ public class ShellSpout implements ISpout { querySubprocess(); } - private void handleMetrics(ShellMsg shellMsg) { //get metric name String name = shellMsg.getMetricName(); @@ -191,7 +196,7 @@ public class ShellSpout implements ISpout { if (command.equals("sync")) { return; } else if (command.equals("log")) { - handleLog(shellMsg); + _logHandler.log(shellMsg); } else if (command.equals("error")) { handleError(shellMsg.getMsg()); } else if (command.equals("emit")) { @@ -221,34 +226,6 @@ public class ShellSpout implements ISpout { } } - - private void handleLog(ShellMsg shellMsg) { - String msg = shellMsg.getMsg(); - msg = "ShellLog " + _process.getProcessInfoString() + " " + msg; - ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); - - switch (logLevel) { - case TRACE: - LOG.trace(msg); - break; - case DEBUG: - LOG.debug(msg); - break; - case INFO: - LOG.info(msg); - break; - case WARN: - LOG.warn(msg); - break; - case ERROR: - LOG.error(msg); - break; - default: - LOG.info(msg); - break; - } - } - private void handleError(String msg) { _collector.reportError(new Exception("Shell Process Exception: " + msg)); } http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java index 3d9f141..d520d07 100644 --- a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java @@ -24,10 +24,12 @@ import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.rpc.IShellMetric; import org.apache.storm.multilang.BoltMsg; import org.apache.storm.multilang.ShellMsg; -import org.apache.storm.topology.ReportedFailedException; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.ShellBoltMessageQueue; +import org.apache.storm.utils.ShellLogHandler; import org.apache.storm.utils.ShellProcess; +import org.apache.storm.utils.ShellUtils; + import clojure.lang.RT; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; @@ -77,6 +79,7 @@ public class ShellBolt implements IBolt { private String[] _command; private Map<String, String> env = new HashMap<>(); + private ShellLogHandler _logHandler; private ShellProcess _process; private volatile boolean _running = true; private volatile Throwable _exception; @@ -150,6 +153,9 @@ public class ShellBolt implements IBolt { Number subpid = _process.launch(stormConf, context, changeDirectory); LOG.info("Launched subprocess with pid " + subpid); + _logHandler = ShellUtils.getLogHandler(stormConf); + _logHandler.setUpContext(ShellBolt.class, _process, _context); + // reader _readerThread = new Thread(new BoltReaderRunnable()); _readerThread.start(); @@ -245,34 +251,6 @@ public class ShellBolt implements IBolt { } } - private void handleLog(ShellMsg shellMsg) { - String msg = shellMsg.getMsg(); - msg = "ShellLog " + _process.getProcessInfoString() + " " + msg; - ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); - - switch (logLevel) { - case TRACE: - LOG.trace(msg); - break; - case DEBUG: - LOG.debug(msg); - break; - case INFO: - LOG.info(msg); - break; - case WARN: - LOG.warn(msg); - break; - case ERROR: - LOG.error(msg); - _collector.reportError(new ReportedFailedException(msg)); - break; - default: - LOG.info(msg); - break; - } - } - private void handleMetrics(ShellMsg shellMsg) { //get metric name String name = shellMsg.getMetricName(); @@ -370,7 +348,7 @@ public class ShellBolt implements IBolt { handleError(shellMsg.getMsg()); break; case "log": - handleLog(shellMsg); + _logHandler.log(shellMsg); break; case "emit": handleEmit(shellMsg); http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java b/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java new file mode 100644 index 0000000..2004bc1 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import org.apache.storm.multilang.ShellMsg; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation of {@link ShellLogHandler}. + */ +public class DefaultShellLogHandler implements ShellLogHandler { + + private Logger log; + + /** + * Save information about the current process. + */ + private ShellProcess process; + + /** + * Default constructor; used when loading with + * Class.forName(...).newInstance(). + */ + public DefaultShellLogHandler() { + } + + private Logger getLogger(final Class<?> ownerCls) { + return LoggerFactory.getLogger( + ownerCls == null ? DefaultShellLogHandler.class : ownerCls); + } + + /** + * This default implementation saves the {@link ShellProcess} so it can + * output the process info string later. + * @see {@link ShellLogHandler#setUpContext} + * + * @param ownerCls + * - the class which instantiated this ShellLogHandler. + * @param process + * - the current {@link ShellProcess}. + * @param context + * - the current {@link TopologyContext}. + */ + public void setUpContext(final Class<?> ownerCls, final ShellProcess process, + final TopologyContext context) { + this.log = getLogger(ownerCls); + this.process = process; + // context is not used by the default implementation, but is included + // in the interface in case it is useful to subclasses + } + + /** + * Log the given message. + * @see {@link ShellLogHandler#log} + * + * @param shellMsg + * - the {@link ShellMsg} to log. + */ + public void log(final ShellMsg shellMsg) { + if (shellMsg == null) { + throw new IllegalArgumentException("shellMsg is required"); + } + String msg = shellMsg.getMsg(); + if (this.log == null) { + this.log = getLogger(null); + } + if (this.process == null) { + msg = "ShellLog " + msg; + } else { + msg = "ShellLog " + process.getProcessInfoString() + " " + msg; + } + ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); + + switch (logLevel) { + case TRACE: + log.trace(msg); + break; + case DEBUG: + log.debug(msg); + break; + case INFO: + log.info(msg); + break; + case WARN: + log.warn(msg); + break; + case ERROR: + log.error(msg); + break; + default: + log.info(msg); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java b/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java new file mode 100644 index 0000000..8463a9e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.utils; + +import org.apache.storm.multilang.ShellMsg; +import org.apache.storm.task.TopologyContext; + +/** + * Handle logging from non-JVM processes. + */ +public interface ShellLogHandler { + + /** + * Called at least once before {@link ShellLogHandler#log} for each + * spout and bolt. Allows implementing classes to save information about + * the current running context e.g. pid, thread, task. + * + * @param ownerCls + * - the class which instantiated this ShellLogHandler. + * @param process + * - the current {@link ShellProcess}. + * @param context + * - the current {@link TopologyContext}. + */ + void setUpContext(Class<?> ownerCls, ShellProcess process, + TopologyContext context); + + /** + * Called by spouts and bolts when they receive a 'log' command from a + * multilang process. + * + * @param msg + * - the {@link ShellMsg} containing the message to log. + */ + void log(ShellMsg msg); +} http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java b/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java index ef869b0..3b0f934 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java @@ -28,6 +28,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.storm.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -502,4 +503,20 @@ abstract public class ShellUtils { } } + public static ShellLogHandler getLogHandler(Map stormConf) { + if (stormConf == null) { + throw new IllegalArgumentException("Config is required"); + } + + String logHandlerClassName = null; + if (stormConf.containsKey(Config.TOPOLOGY_MULTILANG_LOG_HANDLER)) { + try { + logHandlerClassName = stormConf.get(Config.TOPOLOGY_MULTILANG_LOG_HANDLER).toString(); + return (ShellLogHandler) Class.forName(logHandlerClassName).newInstance(); + } catch (ClassCastException | InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new RuntimeException("Error loading ShellLogHandler " + logHandlerClassName, e); + } + } + return new DefaultShellLogHandler(); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java b/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java new file mode 100644 index 0000000..1e2c70d --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.utils; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.storm.multilang.ShellMsg; +import org.apache.storm.multilang.ShellMsg.ShellLogLevel; +import org.junit.Before; +import org.junit.Test; + +public class DefaultShellLogHandlerTest { + + private DefaultShellLogHandler logHandler; + + @Before + public void setUp() { + logHandler = new DefaultShellLogHandler(); + } + + private ShellMsg mockMsg() { + ShellMsg shellMsg = mock(ShellMsg.class); + when(shellMsg.getMsg()).thenReturn("msg"); + when(shellMsg.getLogLevel()).thenReturn(ShellLogLevel.INFO); + return shellMsg; + } + + private ShellProcess mockProcess() { + ShellProcess process = mock(ShellProcess.class); + when(process.getProcessInfoString()).thenReturn("info"); + return process; + } + + /** + * It's fine to pass only null arguments to setUpContext. + */ + @Test + public void setUpContext_allNull() { + ShellMsg msg = mockMsg(); + logHandler.setUpContext(null, null, null); + logHandler.log(msg); + verify(msg).getMsg(); + } + + /** + * Calling setUpContext is optional. + */ + @Test + public void setUpContext_optional() { + ShellMsg msg = mockMsg(); + logHandler.log(msg); + verify(msg).getMsg(); + } + + /** + * A null {@link ShellMsg} will throw IllegalArgumentException. + */ + @Test(expected = IllegalArgumentException.class) + public void handleLog_nullShellMsg() { + logHandler.log(null); + } + + /** + * A null {@link ShellProcess} will not throw an exception. + */ + @Test + public void handleLog_nullProcess() { + ShellMsg msg = mockMsg(); + ShellProcess process = mockProcess(); + logHandler.setUpContext(DefaultShellLogHandlerTest.class, process, null); + logHandler.log(msg); + verify(msg).getMsg(); + } + + /** + * If both {@link ShellMsg} and {@link ShellProcess} are provided, both + * will be used to build the log message. + */ + @Test + public void handleLog_valid() { + ShellMsg msg = mockMsg(); + ShellProcess process = mockProcess(); + logHandler.setUpContext(DefaultShellLogHandlerTest.class, process, null); + logHandler.log(msg); + verify(msg).getMsg(); + verify(process).getProcessInfoString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java b/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java new file mode 100644 index 0000000..087b33b --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.utils; + +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.multilang.ShellMsg; +import org.apache.storm.task.TopologyContext; +import org.junit.Test; + +public class ShellUtilsTest { + + public static class CustomShellLogHandler implements ShellLogHandler { + @Override + public void setUpContext(Class<?> owner, ShellProcess process, + TopologyContext context) { + } + + @Override + public void log(ShellMsg msg) { + } + } + + private Map<String, Object> configureLogHandler(String className) { + Map<String, Object> conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_MULTILANG_LOG_HANDLER, className); + return conf; + } + + /** + * A null config will throw IllegalArgumentException. + */ + @Test(expected = IllegalArgumentException.class) + public void getLogHandler_nullConf() { + ShellUtils.getLogHandler(null); + } + + /** + * If a log handler is not configured, {@link DefaultShellLogHandler} + * will be returned. + */ + @Test + public void getLogHandler_notConfigured() { + ShellLogHandler logHandler = ShellUtils.getLogHandler(new HashMap<String, Object>()); + assertTrue(logHandler.getClass() == DefaultShellLogHandler.class); + } + + /** + * If a log handler cannot be found, a {@link RuntimeException} will be + * thrown with {@link ClassNotFoundException} as the cause. + */ + @Test + public void getLogHandler_notFound() { + try { + configureLogHandler("class.not.Found"); + } catch (RuntimeException e) { + assert(e.getCause().getClass() == ClassNotFoundException.class); + } + } + + /** + * If a log handler is not an instance of {@link ShellLogHandler}, a + * {@link RuntimeException} will be thrown with {@link ClassCastException} + * as the cause. + */ + @Test + public void getLogHandler_notAShellLogHandler() { + try { + configureLogHandler("java.lang.String"); + } catch (RuntimeException e) { + assert(e.getCause().getClass() == ClassCastException.class); + } + } + + /** + * If a log handler is correctly configured, it will be returned. + */ + @Test + public void getLogHandler_customHandler() { + Map<String, Object> conf = configureLogHandler("org.apache.storm.utils.ShellUtilsTest$CustomShellLogHandler"); + ShellLogHandler logHandler = ShellUtils.getLogHandler(conf); + assertTrue(logHandler.getClass() == CustomShellLogHandler.class); + } +} \ No newline at end of file
