Merge branch '1.x-branch' into metrics_v2

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b5ae9c34
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b5ae9c34
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b5ae9c34

Branch: refs/heads/1.x-branch
Commit: b5ae9c3426e69eba11446d055649307daecb05c7
Parents: 47be75c b8f76af
Author: P. Taylor Goetz <[email protected]>
Authored: Wed Dec 20 15:28:37 2017 -0500
Committer: P. Taylor Goetz <[email protected]>
Committed: Wed Dec 20 15:28:37 2017 -0500

----------------------------------------------------------------------
 .travis.yml                                     |   1 +
 conf/storm.yaml.example                         |   9 +-
 docs/Eventlogging.md                            |  33 ++-
 .../flux/examples/StatefulWordCounter.java      |   7 +-
 .../storm/flux/examples/TestPrintBolt.java      |   3 +-
 .../storm/flux/examples/TestWindowBolt.java     |   5 +-
 .../storm/flux/examples/WordCountClient.java    |  20 +-
 .../apache/storm/flux/examples/WordCounter.java |  11 +-
 .../main/java/org/apache/storm/flux/Flux.java   |  69 +++---
 .../java/org/apache/storm/flux/FluxBuilder.java | 234 ++++++++++++-------
 .../apache/storm/flux/api/TopologySource.java   |   8 +-
 .../org/apache/storm/flux/model/BeanDef.java    |   1 +
 .../storm/flux/model/BeanListReference.java     |   5 +-
 .../apache/storm/flux/model/BeanReference.java  |   5 +-
 .../org/apache/storm/flux/model/BoltDef.java    |   1 +
 .../storm/flux/model/ConfigMethodDef.java       |  13 +-
 .../storm/flux/model/ExecutionContext.java      |  23 +-
 .../apache/storm/flux/model/GroupingDef.java    |   3 +-
 .../org/apache/storm/flux/model/IncludeDef.java |   3 +-
 .../org/apache/storm/flux/model/ObjectDef.java  |  64 ++++-
 .../apache/storm/flux/model/PropertyDef.java    |  17 +-
 .../org/apache/storm/flux/model/SpoutDef.java   |   1 +
 .../org/apache/storm/flux/model/StreamDef.java  |   3 +-
 .../apache/storm/flux/model/TopologyDef.java    | 114 ++++++---
 .../storm/flux/model/TopologySourceDef.java     |   3 +-
 .../org/apache/storm/flux/model/VertexDef.java  |   3 +-
 .../apache/storm/flux/parser/FluxParser.java    | 157 ++++++++-----
 .../java/org/apache/storm/flux/TCKTest.java     |   4 +
 .../org/apache/storm/flux/test/TestBolt.java    |  21 +-
 .../resources/configs/config-methods-test.yaml  |  38 +++
 .../flux/wrappers/bolts/FluxShellBolt.java      |  38 +--
 .../storm/flux/wrappers/bolts/LogInfoBolt.java  |   2 +-
 .../flux/wrappers/spouts/FluxShellSpout.java    |  36 +--
 .../apache/storm/kafka/spout/KafkaSpout.java    |  84 ++++---
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  98 ++++----
 .../spout/ManualPartitionSubscription.java      |   2 +-
 .../storm/kafka/spout/KafkaSpoutConfigTest.java | 150 +++++++++++-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |  40 +++-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |  53 ++++-
 .../ManualPartitionSubscriptionTest.java        |  79 +++++++
 storm-core/src/jvm/org/apache/storm/Config.java |  40 +++-
 .../apache/storm/messaging/netty/Client.java    |   7 +-
 .../apache/storm/metric/EventLoggerBolt.java    |  61 ++++-
 .../storm/metric/FileBasedEventLogger.java      |  11 +-
 .../org/apache/storm/metric/IEventLogger.java   |  41 +++-
 .../storm/validation/ConfigValidation.java      |  20 ++
 46 files changed, 1211 insertions(+), 430 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b5ae9c34/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/b5ae9c34/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
index 89aecee,0602dbf..d7ca48d
--- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@@ -491,49 -491,28 +491,69 @@@ public class ConfigValidation 
              SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
              new IntegerValidator().validateField(name, ((Map) 
o).get("parallelism.hint"));
          }
 +    }
 +
 +    public static class MetricReportersValidator extends Validator {
 +
 +        @Override
 +        public void validateField(String name, Object o) {
 +            if(o == null) {
 +                return;
 +            }
 +            SimpleTypeValidator.validateField(name, Map.class, o);
 +            if(!((Map) o).containsKey("class") ) {
 +                throw new IllegalArgumentException( "Field " + name + " must 
have map entry with key: class");
 +            }
 +            if(!((Map) o).containsKey("daemons") ) {
 +                throw new IllegalArgumentException("Field " + name + " must 
have map entry with key: daemons");
 +            } else {
 +                // daemons can only be 'nimbus', 'supervisor', or 'worker'
 +                Object list = ((Map)o).get("daemons");
 +                if(list == null || !(list instanceof List)){
 +                    throw new IllegalArgumentException("Field 'daemons' must 
be a non-null list.");
 +                }
 +                List daemonList = (List)list;
 +                for(Object string : daemonList){
 +                    if (string instanceof String &&
 +                            (((String) string).equals("nimbus") ||
 +                                    ((String) string).equals("supervisor") ||
 +                                    ((String) string).equals("worker"))) {
 +                        continue;
 +                    }
 +                    throw new IllegalArgumentException("Field 'daemons' must 
contain at least one of the following:" +
 +                            " \"nimbus\", \"supervisor\", or \"worker\"");
 +                }
 +
 +            }
 +            if(((Map)o).containsKey("filter")){
 +                Map filterMap = (Map)((Map)o).get("filter");
 +                SimpleTypeValidator.validateField("class", String.class, 
filterMap.get("class"));
 +            }
 +            SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
 +
 +        }
      }
  
+     public static class EventLoggerRegistryValidator extends Validator {
+ 
+         @Override
+         public void validateField(String name, Object o) {
+             if(o == null) {
+                 return;
+             }
+             SimpleTypeValidator.validateField(name, Map.class, o);
+             if(!((Map<?, ?>) o).containsKey("class") ) {
+                 throw new IllegalArgumentException( "Field " + name + " must 
have map entry with key: class");
+             }
+ 
+             SimpleTypeValidator.validateField(name, String.class, ((Map<?, 
?>) o).get("class"));
+ 
+             if(((Map<?, ?>) o).containsKey("arguments") ) {
+                 SimpleTypeValidator.validateField(name, Map.class, ((Map<?, 
?>) o).get("arguments"));
+             }
+         }
+     }
+ 
      public static class MapOfStringToMapOfStringToObjectValidator extends 
Validator {
        @Override
        public  void validateField(String name, Object o) {

Reply via email to