STORM-2153: address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/868de5b3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/868de5b3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/868de5b3 Branch: refs/heads/1.x-branch Commit: 868de5b33b8145d787a9b3d08bdac6908591790d Parents: b5ae9c3 Author: P. Taylor Goetz <[email protected]> Authored: Fri Dec 22 14:42:35 2017 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Dec 22 14:42:35 2017 -0500 ---------------------------------------------------------------------- .../reporters/ScheduledStormReporter.java | 2 +- .../org/apache/storm/utils/DisruptorQueue.java | 18 ++++++++----- .../storm/validation/ConfigValidation.java | 28 ++++++++++++-------- 3 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java index e88b41b..b7ffa61 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java @@ -81,7 +81,7 @@ public abstract class ScheduledStormReporter implements StormReporter{ filter = (StormMetricsFilter) Metrics2Utils.instantiate(clazz); filter.prepare(filterConf); } catch (Exception e) { - LOG.warn("Unable to instantiate StormMetricsFilter class: {}", clazz); + throw new RuntimeException("Unable to instantiate StormMetricsFilter class: " + clazz); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index ca8568c..d7cf401 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -437,12 +437,17 @@ public class DisruptorQueue implements IStatefulObject { _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); - METRICS_TIMER.schedule(new TimerTask(){ - @Override - public void run() { - _disruptorMetrics.set(_metrics); - } - }, 15000, 15000); + try { + METRICS_TIMER.schedule(new TimerTask() { + @Override + public void run() { + _disruptorMetrics.set(_metrics); + } + }, 15000, 15000); + } catch (IllegalStateException e){ + // Ignore. IllegalStateException is thrown by Timer.schedule() if the timer + // has been cancelled. (This happens in unit tests) + } } public String getName() { @@ -458,6 +463,7 @@ public class DisruptorQueue implements IStatefulObject { publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true); _flusher.close(); _metrics.close(); + METRICS_TIMER.cancel(); } catch (InsufficientCapacityException e) { //This should be impossible throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/storm/blob/868de5b3/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java index d7ca48d..9d9db33 100644 --- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -494,6 +494,12 @@ public class ConfigValidation { } public static class MetricReportersValidator extends Validator { + private static final String NIMBUS = "nimbus"; + private static final String SUPERVISOR = "supervisor"; + private static final String WORKER = "worker"; + private static final String CLASS = "class"; + private static final String FILTER = "filter"; + private static final String DAEMONS = "daemons"; @Override public void validateField(String name, Object o) { @@ -501,23 +507,23 @@ public class ConfigValidation { return; } SimpleTypeValidator.validateField(name, Map.class, o); - if(!((Map) o).containsKey("class") ) { + if(!((Map) o).containsKey(CLASS) ) { throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); } - if(!((Map) o).containsKey("daemons") ) { + if(!((Map) o).containsKey(DAEMONS) ) { throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); } else { // daemons can only be 'nimbus', 'supervisor', or 'worker' - Object list = ((Map)o).get("daemons"); - if(list == null || !(list instanceof List)){ + Object list = ((Map)o).get(DAEMONS); + if(!(list instanceof List)){ throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); } List daemonList = (List)list; for(Object string : daemonList){ if (string instanceof String && - (((String) string).equals("nimbus") || - ((String) string).equals("supervisor") || - ((String) string).equals("worker"))) { + (string.equals(NIMBUS) || + string.equals(SUPERVISOR) || + string.equals(WORKER))) { continue; } throw new IllegalArgumentException("Field 'daemons' must contain at least one of the following:" + @@ -525,11 +531,11 @@ public class ConfigValidation { } } - if(((Map)o).containsKey("filter")){ - Map filterMap = (Map)((Map)o).get("filter"); - SimpleTypeValidator.validateField("class", String.class, filterMap.get("class")); + if(((Map)o).containsKey(FILTER)){ + Map filterMap = (Map)((Map)o).get(FILTER); + SimpleTypeValidator.validateField(CLASS, String.class, filterMap.get(CLASS)); } - SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); + SimpleTypeValidator.validateField(name, String.class, ((Map) o).get(CLASS)); } }
