Merge branch '1.x-branch' into metrics_v2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b5ae9c34 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b5ae9c34 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b5ae9c34 Branch: refs/heads/1.x-branch Commit: b5ae9c3426e69eba11446d055649307daecb05c7 Parents: 47be75c b8f76af Author: P. Taylor Goetz <[email protected]> Authored: Wed Dec 20 15:28:37 2017 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Wed Dec 20 15:28:37 2017 -0500 ---------------------------------------------------------------------- .travis.yml | 1 + conf/storm.yaml.example | 9 +- docs/Eventlogging.md | 33 ++- .../flux/examples/StatefulWordCounter.java | 7 +- .../storm/flux/examples/TestPrintBolt.java | 3 +- .../storm/flux/examples/TestWindowBolt.java | 5 +- .../storm/flux/examples/WordCountClient.java | 20 +- .../apache/storm/flux/examples/WordCounter.java | 11 +- .../main/java/org/apache/storm/flux/Flux.java | 69 +++--- .../java/org/apache/storm/flux/FluxBuilder.java | 234 ++++++++++++------- .../apache/storm/flux/api/TopologySource.java | 8 +- .../org/apache/storm/flux/model/BeanDef.java | 1 + .../storm/flux/model/BeanListReference.java | 5 +- .../apache/storm/flux/model/BeanReference.java | 5 +- .../org/apache/storm/flux/model/BoltDef.java | 1 + .../storm/flux/model/ConfigMethodDef.java | 13 +- .../storm/flux/model/ExecutionContext.java | 23 +- .../apache/storm/flux/model/GroupingDef.java | 3 +- .../org/apache/storm/flux/model/IncludeDef.java | 3 +- .../org/apache/storm/flux/model/ObjectDef.java | 64 ++++- .../apache/storm/flux/model/PropertyDef.java | 17 +- .../org/apache/storm/flux/model/SpoutDef.java | 1 + .../org/apache/storm/flux/model/StreamDef.java | 3 +- .../apache/storm/flux/model/TopologyDef.java | 114 ++++++--- .../storm/flux/model/TopologySourceDef.java | 3 +- .../org/apache/storm/flux/model/VertexDef.java | 3 +- .../apache/storm/flux/parser/FluxParser.java | 157 ++++++++----- .../java/org/apache/storm/flux/TCKTest.java | 4 + .../org/apache/storm/flux/test/TestBolt.java | 21 +- .../resources/configs/config-methods-test.yaml | 38 +++ .../flux/wrappers/bolts/FluxShellBolt.java | 38 +-- .../storm/flux/wrappers/bolts/LogInfoBolt.java | 2 +- .../flux/wrappers/spouts/FluxShellSpout.java | 36 +-- .../apache/storm/kafka/spout/KafkaSpout.java | 84 ++++--- .../storm/kafka/spout/KafkaSpoutConfig.java | 98 ++++---- .../spout/ManualPartitionSubscription.java | 2 +- .../storm/kafka/spout/KafkaSpoutConfigTest.java | 150 +++++++++++- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 40 +++- .../kafka/spout/MaxUncommittedOffsetTest.java | 53 ++++- .../ManualPartitionSubscriptionTest.java | 79 +++++++ storm-core/src/jvm/org/apache/storm/Config.java | 40 +++- .../apache/storm/messaging/netty/Client.java | 7 +- .../apache/storm/metric/EventLoggerBolt.java | 61 ++++- .../storm/metric/FileBasedEventLogger.java | 11 +- .../org/apache/storm/metric/IEventLogger.java | 41 +++- .../storm/validation/ConfigValidation.java | 20 ++ 46 files changed, 1211 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b5ae9c34/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b5ae9c34/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java index 89aecee,0602dbf..d7ca48d --- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java @@@ -491,49 -491,28 +491,69 @@@ public class ConfigValidation SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); new IntegerValidator().validateField(name, ((Map) o).get("parallelism.hint")); } + } + + public static class MetricReportersValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + if(o == null) { + return; + } + SimpleTypeValidator.validateField(name, Map.class, o); + if(!((Map) o).containsKey("class") ) { + throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + } + if(!((Map) o).containsKey("daemons") ) { + throw new IllegalArgumentException("Field " + name + " must have map entry with key: daemons"); + } else { + // daemons can only be 'nimbus', 'supervisor', or 'worker' + Object list = ((Map)o).get("daemons"); + if(list == null || !(list instanceof List)){ + throw new IllegalArgumentException("Field 'daemons' must be a non-null list."); + } + List daemonList = (List)list; + for(Object string : daemonList){ + if (string instanceof String && + (((String) string).equals("nimbus") || + ((String) string).equals("supervisor") || + ((String) string).equals("worker"))) { + continue; + } + throw new IllegalArgumentException("Field 'daemons' must contain at least one of the following:" + + " \"nimbus\", \"supervisor\", or \"worker\""); + } + + } + if(((Map)o).containsKey("filter")){ + Map filterMap = (Map)((Map)o).get("filter"); + SimpleTypeValidator.validateField("class", String.class, filterMap.get("class")); + } + SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); + + } } + public static class EventLoggerRegistryValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + if(o == null) { + return; + } + SimpleTypeValidator.validateField(name, Map.class, o); + if(!((Map<?, ?>) o).containsKey("class") ) { + throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + } + + SimpleTypeValidator.validateField(name, String.class, ((Map<?, ?>) o).get("class")); + + if(((Map<?, ?>) o).containsKey("arguments") ) { + SimpleTypeValidator.validateField(name, Map.class, ((Map<?, ?>) o).get("arguments")); + } + } + } + public static class MapOfStringToMapOfStringToObjectValidator extends Validator { @Override public void validateField(String name, Object o) {
