Repository: metron
Updated Branches:
  refs/heads/master 7cd39316a -> dcec5a7cf


METRON-1001: Allow metron to ingest parser metadata along with data closes 
apache/incubator-metron#621


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/dcec5a7c
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/dcec5a7c
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/dcec5a7c

Branch: refs/heads/master
Commit: dcec5a7cfb7f8b6544af8ab137dab3f75de9c49a
Parents: 7cd3931
Author: cstella <[email protected]>
Authored: Tue Jul 11 18:25:37 2017 +0100
Committer: cstella <[email protected]>
Committed: Tue Jul 11 18:25:37 2017 +0100

----------------------------------------------------------------------
 .../rest/service/impl/StellarServiceImpl.java   |  2 +-
 .../org/apache/metron/common/Constants.java     |  1 +
 .../common/configuration/FieldTransformer.java  | 10 ++--
 .../configuration/SensorParserConfig.java       | 26 +++++++++
 .../transformation/FieldTransformation.java     |  2 +-
 .../transformation/RemoveTransformation.java    |  2 +-
 .../SimpleFieldTransformation.java              |  2 +-
 .../transformation/StellarTransformation.java   |  5 +-
 .../transformation/FieldTransformationTest.java |  9 ++-
 .../RemoveTransformationTest.java               |  8 +--
 .../StellarTransformationTest.java              | 18 +++---
 .../management/ParserConfigFunctionsTest.java   |  2 +-
 metron-platform/metron-parsers/README.md        | 53 ++++++++++++++++-
 .../apache/metron/parsers/bolt/ParserBolt.java  | 60 +++++++++++++++++---
 .../parsers/topology/ParserTopologyBuilder.java |  9 ++-
 .../metron/spout/pcap/HDFSWriterCallback.java   | 15 +++--
 .../kafka/flux/SimpleStormKafkaBuilder.java     | 54 ++++++++++++++----
 .../storm/kafka/flux/StormKafkaSpout.java       | 11 +++-
 .../apache/storm/kafka/CallbackKafkaSpout.java  |  4 +-
 .../org/apache/storm/kafka/EmitContext.java     |  1 -
 .../metron/stellar/dsl/MapVariableResolver.java | 13 ++++-
 21 files changed, 242 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java
index 1f9af3f..715bd37 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java
@@ -72,7 +72,7 @@ public class StellarServiceImpl implements StellarService {
   public Map<String, Object> applyTransformations(SensorParserContext 
sensorParserContext) {
     JSONObject sampleJson = new 
JSONObject(sensorParserContext.getSampleData());
     
sensorParserContext.getSensorParserConfig().getFieldTransformations().forEach(fieldTransformer
 -> {
-              fieldTransformer.transformAndUpdate(sampleJson, 
sensorParserContext.getSensorParserConfig().getParserConfig(), 
Context.EMPTY_CONTEXT());
+              fieldTransformer.transformAndUpdate(sampleJson, 
Context.EMPTY_CONTEXT(), 
sensorParserContext.getSensorParserConfig().getParserConfig());
             }
     );
     return sampleJson;

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index c2ede49..8b7e478 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 public class Constants {
 
+  public static final String METADATA_PREFIX = "metron.metadata.";
   public static final String ZOOKEEPER_ROOT = "/metron";
   public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + 
"/topology";
   public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000;

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
index b104bba..df80691 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
@@ -103,21 +103,21 @@ public class FieldTransformer implements Serializable {
     }
   }
 
-  public Map<String, Object> transform(JSONObject input, Map<String, Object> 
sensorConfig, Context context) {
+  public Map<String, Object> transform(JSONObject input, Context context, 
Map<String, Object>... sensorConfig) {
     if(getInput() == null || getInput().isEmpty()) {
-      return transformation.map(input, getOutput(), config, sensorConfig, 
context);
+      return transformation.map(input, getOutput(), config, context, 
sensorConfig);
     }
     else {
       Map<String, Object> in = new HashMap<>();
       for(String inputField : getInput()) {
         in.put(inputField, input.get(inputField));
       }
-      return transformation.map(in, getOutput(), config, sensorConfig, 
context);
+      return transformation.map(in, getOutput(), config, context, 
sensorConfig);
     }
   }
 
-  public void transformAndUpdate(JSONObject message, Map<String, Object> 
sensorConfig, Context context) {
-    Map<String, Object> currentValue = transform(message, sensorConfig, 
context);
+  public void transformAndUpdate(JSONObject message, Context context, 
Map<String, Object>... sensorConfig) {
+    Map<String, Object> currentValue = transform(message, context, 
sensorConfig);
     if(currentValue != null) {
       for(Map.Entry<String, Object> kv : currentValue.entrySet()) {
         if(kv.getValue() == null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index e49c99b..d72f462 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -35,6 +35,24 @@ public class SensorParserConfig implements Serializable {
   private String writerClassName;
   private String errorWriterClassName;
   private String invalidWriterClassName;
+  private Boolean readMetadata = false;
+  private Boolean mergeMetadata = false;
+
+  public Boolean mergeMetadata() {
+    return mergeMetadata;
+  }
+
+  public void setMergeMetadata(Boolean mergeMetadata) {
+    this.mergeMetadata = mergeMetadata;
+  }
+
+  public Boolean readMetadata() {
+    return readMetadata;
+  }
+
+  public void setReadMetadata(Boolean readMetadata) {
+    this.readMetadata = readMetadata;
+  }
 
   public String getErrorWriterClassName() {
     return errorWriterClassName;
@@ -129,6 +147,8 @@ public class SensorParserConfig implements Serializable {
             ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
             ", parserConfig=" + parserConfig +
             ", fieldTransformations=" + fieldTransformations +
+            ", readMetadata=" + readMetadata +
+            ", mergeMetadata=" + mergeMetadata +
             '}';
   }
 
@@ -153,6 +173,10 @@ public class SensorParserConfig implements Serializable {
       return false;
     if (getParserConfig() != null ? 
!getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != 
null)
       return false;
+    if (readMetadata() != null ? !readMetadata().equals(that.readMetadata()) : 
that.readMetadata() != null)
+      return false;
+    if (mergeMetadata() != null ? 
!mergeMetadata().equals(that.mergeMetadata()) : that.mergeMetadata() != null)
+      return false;
     return getFieldTransformations() != null ? 
getFieldTransformations().equals(that.getFieldTransformations()) : 
that.getFieldTransformations() == null;
 
   }
@@ -167,6 +191,8 @@ public class SensorParserConfig implements Serializable {
     result = 31 * result + (getInvalidWriterClassName() != null ? 
getInvalidWriterClassName().hashCode() : 0);
     result = 31 * result + (getParserConfig() != null ? 
getParserConfig().hashCode() : 0);
     result = 31 * result + (getFieldTransformations() != null ? 
getFieldTransformations().hashCode() : 0);
+    result = 31 * result + (readMetadata() != null ? readMetadata().hashCode() 
: 0);
+    result = 31 * result + (mergeMetadata() != null ? 
mergeMetadata().hashCode() : 0);
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
index d75df55..138e228 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java
@@ -29,7 +29,7 @@ public interface FieldTransformation extends Serializable {
   Map<String, Object> map( Map<String, Object> input
                          , List<String> outputField
                          , LinkedHashMap<String, Object> fieldMappingConfig
-                         , Map<String, Object> sensorConfig
                          , Context context
+                         , Map<String, Object>... sensorConfig
                          );
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
index a94ccd8..1c875b4 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java
@@ -68,8 +68,8 @@ public class RemoveTransformation implements 
FieldTransformation {
   public Map<String, Object> map( Map<String, Object> input
                                 , final List<String> outputFields
                                 , LinkedHashMap<String, Object> 
fieldMappingConfig
-                                , Map<String, Object> sensorConfig
                                 , Context context
+                                , Map<String, Object>... sensorConfig
                                 ) {
     String condition = getCondition(fieldMappingConfig);
     StellarPredicateProcessor processor = getPredicateProcessor(condition);

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
index 252e5e5..9ccbdc7 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java
@@ -30,8 +30,8 @@ public abstract class SimpleFieldTransformation implements 
FieldTransformation {
   public Map<String, Object> map (Map<String, Object> input
                                 , List<String> outputField
                                 , LinkedHashMap<String, Object> 
fieldMappingConfig
-                                , Map<String, Object> sensorConfig
                                 , Context context
+                                , Map<String, Object>... sensorConfig
                                 )
   {
     Object value = (input == null)

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
index 54c7236..a5d8689 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
@@ -31,14 +31,15 @@ public class StellarTransformation implements 
FieldTransformation {
   public Map<String, Object> map( Map<String, Object> input
                                 , List<String> outputField
                                 , LinkedHashMap<String, Object> 
fieldMappingConfig
-                                , Map<String, Object> sensorConfig
                                 , Context context
+                                , Map<String, Object>... sensorConfig
                                 )
   {
     Map<String, Object> ret = new HashMap<>();
     Map<String, Object> intermediateVariables = new HashMap<>();
     Set<String> outputs = new HashSet<>(outputField);
-    VariableResolver resolver = new MapVariableResolver(ret, 
intermediateVariables, input, sensorConfig);
+    MapVariableResolver resolver = new MapVariableResolver(ret, 
intermediateVariables, input);
+    resolver.add(sensorConfig);
     StellarProcessor processor = new StellarProcessor();
     for(Map.Entry<String, Object> kv : fieldMappingConfig.entrySet()) {
       String oField = kv.getKey();

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
index 57de964..71a0298 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
@@ -43,8 +43,8 @@ public class FieldTransformationTest {
     public Map<String, Object> map( Map<String, Object> input
                                   , List<String> outputField
                                   , LinkedHashMap<String, Object> 
fieldMappingConfig
-                                  , Map<String, Object> sensorConfig
                                   , Context context
+                                  , Map<String, Object>... sensorConfig
                                   )
     {
       return ImmutableMap.of(outputField.get(0), 
Joiner.on(fieldMappingConfig.get("delim").toString()).join(input.entrySet()));
@@ -134,8 +134,8 @@ public class FieldTransformationTest {
                                                                   ,"field2", 
"value2"
                                                                   )
                                                   )
-                                   , c.getParserConfig()
                                    , Context.EMPTY_CONTEXT()
+                                   , c.getParserConfig()
                                    )
                        );
   }
@@ -146,7 +146,10 @@ public class FieldTransformationTest {
 
     Assert.assertNotNull(handler);
     Assert.assertEquals(ImmutableMap.of("protocol", "TCP")
-                       ,handler.transform(new 
JSONObject(ImmutableMap.of("protocol", 6)), c.getParserConfig(), 
Context.EMPTY_CONTEXT())
+                       ,handler.transform(new 
JSONObject(ImmutableMap.of("protocol", 6))
+                                         , Context.EMPTY_CONTEXT()
+                                         , c.getParserConfig()
+                                         )
                        );
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
index 72f7480..a8e6d71 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java
@@ -51,7 +51,7 @@ public class RemoveTransformationTest {
     JSONObject input = new JSONObject(new HashMap<String, Object>() {{
       put("field1", "foo");
     }});
-    handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+    handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
     Assert.assertFalse(input.containsKey("field1"));
   }
 
@@ -78,7 +78,7 @@ public class RemoveTransformationTest {
       JSONObject input = new JSONObject(new HashMap<String, Object>() {{
         put("field1", "foo");
       }});
-      handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
       //no removal happened because field2 does not exist
       Assert.assertTrue(input.containsKey("field1"));
       Assert.assertFalse(input.containsKey("field2"));
@@ -88,7 +88,7 @@ public class RemoveTransformationTest {
         put("field1", "foo");
         put("field2", "bar");
       }});
-      handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
       //no removal happened because field2 != bar
       Assert.assertTrue(input.containsKey("field1"));
       Assert.assertTrue(input.containsKey("field2"));
@@ -99,7 +99,7 @@ public class RemoveTransformationTest {
         put("field2", "foo");
       }});
       //removal of field1 happens because field2 exists and is 'bar'
-      handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
       Assert.assertFalse(input.containsKey("field1"));
       Assert.assertTrue(input.containsKey("field2"));
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
index 01008ca..12f8b5c 100644
--- 
a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
+++ 
b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
@@ -73,7 +73,7 @@ public class StellarTransformationTest {
     SensorParserConfig c = 
SensorParserConfig.fromBytes(Bytes.toBytes(configNumericDomain));
     FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), 
null);
     JSONObject input = new JSONObject();
-    handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+    handler.transformAndUpdate(input,  Context.EMPTY_CONTEXT());
     Assert.assertTrue(input.containsKey("full_hostname"));
     
Assert.assertEquals("1234567890123456789012345678901234567890123456789012345678901234567890",
 input.get("full_hostname"));
     Assert.assertFalse(input.containsKey("domain_without_subdomains"));
@@ -87,7 +87,7 @@ public class StellarTransformationTest {
     FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), 
null);
     JSONObject input = new JSONObject();
     try {
-      handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
     }
     catch(IllegalStateException ex) {
       Assert.assertTrue(ex.getMessage().contains("URL_TO_HOST"));
@@ -138,7 +138,7 @@ public class StellarTransformationTest {
     FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), 
null);
     JSONObject input = new JSONObject(new HashMap<String, Object>() {{
     }});
-    handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+    handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
     int expected = 3;
     Assert.assertEquals(expected, input.get("final_value"));
     Assert.assertFalse(input.containsKey("value1"));
@@ -171,7 +171,7 @@ public class StellarTransformationTest {
     JSONObject input = new JSONObject(new HashMap<String, Object>() {{
       put("timestamp", "2016-01-05 17:02:30");
     }});
-    handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+    handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
     long expected = 1452013350000L;
     Assert.assertEquals(expected, input.get("utc_timestamp"));
     Assert.assertTrue(input.containsKey("timestamp"));
@@ -190,7 +190,7 @@ public class StellarTransformationTest {
     JSONObject input = new JSONObject(new HashMap<String, Object>() {{
       put("timestamp", "2016-01-05 17:02:30");
     }});
-    handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+    handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
     long expected = 1452013350000L;
     Assert.assertEquals(expected, input.get("utc_timestamp"));
     Assert.assertTrue(input.containsKey("timestamp"));
@@ -209,7 +209,7 @@ public class StellarTransformationTest {
     //no input fields => no transformation
     JSONObject input = new JSONObject(new HashMap<String, Object>() {{
     }});
-    handler.transformAndUpdate(input, new HashMap<>(), 
Context.EMPTY_CONTEXT());
+    handler.transformAndUpdate(input,  Context.EMPTY_CONTEXT());
     Assert.assertFalse(input.containsKey("utc_timestamp"));
     Assert.assertTrue(input.isEmpty());
   }
@@ -260,7 +260,7 @@ public class StellarTransformationTest {
         //looking up the data center in portland, which doesn't exist in the 
map, so we default to UTC
         put("dc", "portland");
       }});
-      handler.transformAndUpdate(input, c.getParserConfig(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
       long expected = 1452013350000L;
       Assert.assertEquals(expected, input.get("utc_timestamp"));
       Assert.assertEquals("caseystella.com", input.get("url_host"));
@@ -275,7 +275,7 @@ public class StellarTransformationTest {
         put("url", "https://caseystella.com/blog";);
         put("dc", "london");
       }});
-      handler.transformAndUpdate(input, c.getParserConfig(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT(), 
c.getParserConfig());
       long expected = 1452013350000L;
       Assert.assertEquals(expected, input.get("utc_timestamp"));
       Assert.assertEquals("caseystella.com", input.get("url_host"));
@@ -289,7 +289,7 @@ public class StellarTransformationTest {
         put("timestamp", "2016-01-05 17:02:30");
         put("url", "https://caseystella.com/blog";);
       }});
-      handler.transformAndUpdate(input, c.getParserConfig(), 
Context.EMPTY_CONTEXT());
+      handler.transformAndUpdate(input, Context.EMPTY_CONTEXT(), 
c.getParserConfig());
       long expected = 1452013350000L;
       Assert.assertEquals(expected, input.get("utc_timestamp"));
       Assert.assertEquals("caseystella.com", input.get("url_host"));

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
index 0587000..1f59ae0 100644
--- 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java
@@ -64,7 +64,7 @@ public class ParserConfigFunctionsTest {
     sensorParserConfig.init();
     for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
       if (handler != null) {
-        handler.transformAndUpdate(ret, sensorParserConfig.getParserConfig(), 
context);
+        handler.transformAndUpdate(ret, context, 
sensorParserConfig.getParserConfig());
       }
     }
     return ret;

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md 
b/metron-platform/metron-parsers/README.md
index e5d97e0..f03abdf 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -20,8 +20,14 @@ There are two general types types of parsers:
     * `timestampFormat` : The date format of the timestamp to use.  If 
unspecified, the parser assumes the timestamp is ms since unix epoch.
     * `columns` : A map of column names you wish to extract from the CSV to 
their offsets (e.g. `{ 'name' : 1, 'profession' : 3}`  would be a column map 
for extracting the 2nd and 4th columns from a CSV)
     * `separator` : The column separator, `,` by default.
-just
-
+  * JSON Map Parser: `org.apache.metron.parsers.json.JSONMapParser` with 
possible `parserConfig` entries of
+    * `mapStrategy` : A strategy to indicate how to handle multi-dimensional 
Maps.  This is one of
+      * `DROP` : Drop fields which contain maps
+      * `UNFOLD` : Unfold inner maps.  So `{ "foo" : { "bar" : 1} }` would 
turn into `{"foo.bar" : 1}`
+      * `ALLOW` : Allow multidimensional maps
+      * `ERROR` : Throw an error when a multidimensional map is encountered
+    * A field called `timestamp` is expected to exist and, if it does not, 
then current time is inserted.  
+    
 ## Parser Architecture
 
 ![Architecture](parser_arch.png)
@@ -91,7 +97,10 @@ Example Stellar Filter which includes messages which contain 
a the `field1` fiel
     }
    }
 ```
-* `sensorTopic` : The kafka topic to send the parsed messages to.
+* `sensorTopic` : The kafka topic to send the parsed messages to.  If the 
topic is prefixed and suffixed by `/` 
+then it is assumed to be a regex and will match any topic matching the pattern 
(e.g. `/bro.*/` would match `bro_cust0`, `bro_cust1` and `bro_cust2`)
+* `readMetadata` : Boolean indicating whether to read metadata or not (`false` 
by default).  See below for a discussion about metadata.
+* `mergeMetadata` : Boolean indicating whether to merge metadata with the 
message or not (`false` by default).  See below for a discussion about metadata.
 * `parserConfig` : A JSON Map representing the parser implementation specific 
configuration.
 * `fieldTransformations` : An array of complex objects representing the 
transformations to be done on the message generated from the parser before 
writing out to the kafka topic.
 
@@ -101,6 +110,44 @@ transformation which can be done to a message.  This 
transformation can
 * Add new fields given the values of existing fields of a message
 * Remove existing fields of a message
 
+### Metadata
+
+Metadata is a useful thing to send to Metron and use during enrichment or 
threat intelligence.  
+Consider the following scenarios:
+* You have multiple telemetry sources of the same type that you want to 
+  * ensure downstream analysts can differentiate
+  * ensure profiles consider independently as they have different seasonality 
or some other fundamental characteristic
+
+As such, there are two types of metadata that we seek to support in Metron:
+* Environmental metadata : Metadata about the system at large
+   * Consider the possibility that you have multiple kafka topics being 
processed by one parser and you want to tag the messages with the kafka topic
+   * At the moment, only the kafka topic is kept as the field name.
+* Custom metadata: Custom metadata from an individual telemetry source that 
one might want to use within Metron. 
+
+Metadata is controlled by two fields in the parser:
+* `readMetadata` : This is a boolean indicating whether metadata will be read 
and made available to Field 
+transformations (i.e. Stellar field transformations).  The default is `false`.
+* `mergeMetadata` : This is a boolean indicating whether metadata fields will 
be merged with the message automatically.  
+That is to say, if this property is set to `true` then every metadata field 
will become part of the messages and, 
+consequently, also available for use in field transformations.
+#### Field Naming
+
+In order to avoid collisions from metadata fields, metadata fields will be 
prefixed with `metron.metadata.`.  
+So, for instance the kafka topic would be in the field `metron.metadata.topic`.
+
+#### Specifying Custom Metadata
+Custom metadata is specified by sending a JSON Map in the key.  If no key is 
sent, then, obviously, no metadata will be parsed.
+For instance, sending a metadata field called `customer_id` could be done by 
sending
+```
+{
+"customer_id" : "my_customer_id"
+}
+```
+in the kafka key.  This would be exposed as the field 
`metron.metadata.customer_id` to stellar field transformations
+as well, if `mergeMetadata` is `true`, available as a field in its own right.
+
+
+
 ### `fieldTransformation` configuration
 
 The format of a `fieldTransformation` is as follows:

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 6fe2ff0..d78487b 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
@@ -29,6 +30,7 @@ import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
@@ -41,18 +43,16 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.apache.metron.common.Constants.METADATA_PREFIX;
+
 public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
+  private static final int KEY_INDEX = 1;
   private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class);
   private OutputCollector collector;
   private MessageParser<JSONObject> parser;
@@ -121,6 +121,39 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
     StellarFunctions.initialize(stellarContext);
   }
 
+  private Map<String, Object> getMetadata(Tuple t, boolean readMetadata) {
+    Map<String, Object> ret = new HashMap<>();
+    if(!readMetadata) {
+      return ret;
+    }
+    Fields tupleFields = t.getFields();
+    for(int i = 2;i < tupleFields.size();++i) {
+      String envMetadataFieldName = tupleFields.get(i);
+      Object envMetadataFieldValue = t.getValue(i);
+      if(!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue 
!= null) {
+        ret.put(METADATA_PREFIX + envMetadataFieldName, envMetadataFieldValue);
+      }
+    }
+    byte[] keyObj = t.getBinary(KEY_INDEX);
+    String keyStr = null;
+    try {
+      keyStr = keyObj == null?null:new String(keyObj);
+      if(!StringUtils.isEmpty(keyStr)) {
+        Map<String, Object> metadata = JSONUtils.INSTANCE.load(keyStr, new 
TypeReference<Map<String, Object>>() {
+        });
+        for(Map.Entry<String, Object> kv : metadata.entrySet()) {
+          ret.put(METADATA_PREFIX + kv.getKey(), kv.getValue());
+        }
+
+      }
+    } catch (IOException e) {
+        String reason = "Unable to parse metadata; expected JSON Map: " + 
(keyStr == null?"NON-STRING!":keyStr);
+        LOG.error(reason, e);
+        throw new IllegalStateException(reason, e);
+      }
+    return ret;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void execute(Tuple tuple) {
@@ -132,18 +165,29 @@ public class ParserBolt extends ConfiguredParserBolt 
implements Serializable {
       boolean ackTuple = !writer.handleAck();
       int numWritten = 0;
       if(sensorParserConfig != null) {
+        Map<String, Object> metadata = getMetadata(tuple, 
sensorParserConfig.readMetadata());
         List<FieldValidator> fieldValidations = 
getConfigurations().getFieldValidations();
         Optional<List<JSONObject>> messages = 
parser.parseOptional(originalMessage);
         for (JSONObject message : messages.orElse(Collections.emptyList())) {
           message.put(Constants.SENSOR_TYPE, getSensorType());
+          if(sensorParserConfig.mergeMetadata()) {
+            message.putAll(metadata);
+          }
           for (FieldTransformer handler : 
sensorParserConfig.getFieldTransformations()) {
             if (handler != null) {
-              handler.transformAndUpdate(message, 
sensorParserConfig.getParserConfig(), stellarContext);
+              if(!sensorParserConfig.mergeMetadata()) {
+                //if we haven't merged metadata, then we need to pass them 
along as configuration params.
+                handler.transformAndUpdate(message, stellarContext, 
sensorParserConfig.getParserConfig(), metadata);
+              }
+              else {
+                handler.transformAndUpdate(message, stellarContext, 
sensorParserConfig.getParserConfig());
+              }
             }
           }
           if(!message.containsKey(Constants.GUID)) {
             message.put(Constants.GUID, UUID.randomUUID().toString());
           }
+
           if (parser.validate(message) && (filter == null || 
filter.emitTuple(message, stellarContext))) {
             numWritten++;
             List<FieldValidator> failedValidators = 
getFailedValidators(message, fieldValidations);

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 0c88573..feac80b 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -132,7 +132,14 @@ public class ParserTopologyBuilder {
     if(securityProtocol.isPresent()) {
       kafkaSpoutConfigOptions.putIfAbsent("security.protocol", 
securityProtocol.get());
     }
-    return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, 
Arrays.asList("value"), kafkaSpoutConfigOptions);
+    return SimpleStormKafkaBuilder.create( inputTopic
+                                         , zkQuorum
+                                         , Arrays.asList( 
SimpleStormKafkaBuilder.FieldsConfiguration.VALUE.getFieldName()
+                                                        , 
SimpleStormKafkaBuilder.FieldsConfiguration.KEY.getFieldName()
+                                                        , 
SimpleStormKafkaBuilder.FieldsConfiguration.TOPIC.getFieldName()
+                                                        )
+                                         , kafkaSpoutConfigOptions
+                                         );
   }
 
   private static KafkaWriter createKafkaWriter( Optional<String> broker

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
index a6823e6..21d5110 100644
--- 
a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
+++ 
b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -22,6 +22,7 @@ import com.google.common.base.Joiner;
 import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
 import org.apache.storm.kafka.Callback;
 import org.apache.storm.kafka.EmitContext;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,12 +159,18 @@ public class HDFSWriterCallback implements Callback {
     @Override
     public void initialize(EmitContext context) {
         this.context = context;
-        Object topics = context.get(EmitContext.Type.TOPIC);
-        if(topics instanceof List) {
-            this.topic = Joiner.on(",").join((List<String>)topics);
+        KafkaSpoutConfig spoutConfig = 
context.get(EmitContext.Type.SPOUT_CONFIG);
+        if(spoutConfig != null && spoutConfig.getSubscription() != null) {
+            this.topic = spoutConfig.getSubscription().getTopicsString();
+            if(this.topic.length() > 0) {
+                int len = this.topic.length();
+                if(this.topic.charAt(0) == '[' && this.topic.charAt(len - 1) 
== ']') {
+                    this.topic = this.topic.substring(1, len - 1);
+                }
+            }
         }
         else {
-            this.topic = "" + topics;
+            throw new IllegalStateException("Unable to initialize, because 
spout config is not correctly specified");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
index 592859e..4db1302 100644
--- 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.metron.storm.kafka.flux;
 
 import com.google.common.base.Joiner;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -32,8 +33,10 @@ import org.apache.storm.topology.OutputFieldsGetter;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
+import java.io.Serializable;
 import java.util.*;
 import java.util.function.Function;
+import java.util.regex.Pattern;
 
 /**
  * This is a convenience layer on top of the KafkaSpoutConfig.Builder 
available in storm-kafka-client.
@@ -64,6 +67,10 @@ public class SimpleStormKafkaBuilder<K, V> extends 
KafkaSpoutConfig.Builder<K, V
       this.fieldName = fieldName;
     }
 
+    public String getFieldName() {
+      return fieldName;
+    }
+
     /**
      * Return a list of the enums
      * @param configs
@@ -147,12 +154,11 @@ public class SimpleStormKafkaBuilder<K, V> extends 
KafkaSpoutConfig.Builder<K, V
 
   public static String DEFAULT_DESERIALIZER = 
ByteArrayDeserializer.class.getName();
 
-  private String topic;
 
   /**
    * Create an object with the specified properties.  This will expose fields 
"key" and "value."
    * @param kafkaProps The special kafka properties
-   * @param topic The kafka topic. TODO: In the future, support multiple 
topics and regex patterns.
+   * @param topic The kafka topic.
    * @param zkQuorum The zookeeper quorum.  We will use this to pull the 
brokers from this.
    */
   public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
@@ -176,16 +182,48 @@ public class SimpleStormKafkaBuilder<K, V> extends 
KafkaSpoutConfig.Builder<K, V
                                 , List<String> fieldsConfiguration
                                 )
   {
+    this(kafkaProps, toSubscription(topic), zkQuorum, fieldsConfiguration);
+  }
+
+  /**
+   * Create an object with the specified properties and exposing the specified 
fields.
+   * @param kafkaProps The special kafka properties
+   * @param subscription The subscription to the kafka topic(s)
+   * @param zkQuorum The zookeeper quorum.  We will use this to pull the 
brokers from this.
+   * @param fieldsConfiguration The fields to expose in the storm tuple 
emitted.
+   */
+  public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps
+                                , Subscription subscription
+                                , String zkQuorum
+                                , List<String> fieldsConfiguration
+                                )
+  {
     super( getBootstrapServers(zkQuorum, kafkaProps)
          , 
createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
 DEFAULT_DESERIALIZER)
          , 
createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
 DEFAULT_DESERIALIZER)
-         , topic
+         , subscription
     );
     setProp(kafkaProps);
     setRecordTranslator(new 
SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration)));
-    this.topic = topic;
   }
 
+
+  private static Subscription toSubscription(String topicOrSubscription) {
+    if (StringUtils.isEmpty(topicOrSubscription)) {
+      throw new IllegalArgumentException("Topic name is invalid (empty or 
null): " + topicOrSubscription);
+    }
+    int length = topicOrSubscription.length();
+    if(topicOrSubscription.charAt(0) == '/' && 
topicOrSubscription.charAt(length - 1) == '/') {
+      //pattern, so strip off the preceding and ending slashes
+      String substr = topicOrSubscription.substring(1, length - 1);
+      return new PatternSubscription(Pattern.compile(substr));
+    }
+    else {
+      return new NamedSubscription(topicOrSubscription);
+    }
+  }
+
+
   private static <T> Class<Deserializer<T>> createDeserializer( 
Optional<String> deserializerClass
                                                 , String 
defaultDeserializerClass
                                                 )
@@ -210,14 +248,6 @@ public class SimpleStormKafkaBuilder<K, V> extends 
KafkaSpoutConfig.Builder<K, V
   }
 
   /**
-   * Get the kafka topic.  TODO: In the future, support multiple topics and 
regex patterns.
-   * @return
-   */
-  public String getTopic() {
-    return topic;
-  }
-
-  /**
    * Create a StormKafkaSpout from a given topic, zookeeper quorum and fields. 
 Also, configure the spout
    * using a Map that configures both kafka as well as the spout (see the 
properties in SpoutConfiguration).
    * @param topic

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
index 514a21d..8c66a92 100644
--- 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java
@@ -35,13 +35,10 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> {
   private static final Logger LOG = Logger.getLogger(StormKafkaSpout.class);
   protected KafkaSpoutConfig<K,V> _spoutConfig;
-  protected String _topic;
-
   protected AtomicBoolean isShutdown = new AtomicBoolean(false);
 
   public StormKafkaSpout(SimpleStormKafkaBuilder<K,V> builder) {
     super(builder.build());
-    this._topic = builder.getTopic();
     this._spoutConfig = builder.build();
   }
 
@@ -71,5 +68,13 @@ public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> {
       //see https://issues.apache.org/jira/browse/STORM-2184
       LOG.warn("You can generally ignore these, as per 
https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we);
     }
+    catch(IllegalStateException ise) {
+      if(ise.getMessage().contains("This consumer has already been closed")) {
+        LOG.warn(ise.getMessage());
+      }
+      else {
+        throw ise;
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
index 8592e13..acd7c5f 100644
--- 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
@@ -49,8 +49,8 @@ public class CallbackKafkaSpout<K, V> extends 
StormKafkaSpout<K, V> {
   public void initialize(TopologyContext context) {
     _callback = createCallback(callbackClazz);
     _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, 
_spoutConfig)
-            .with(EmitContext.Type.UUID, context.getStormId())
-            .with(EmitContext.Type.TOPIC, _topic);
+                                .with(EmitContext.Type.UUID, 
context.getStormId())
+                                ;
     _callback.initialize(_context);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
index eac3ea8..b53cd9d 100644
--- 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
@@ -35,7 +35,6 @@ public class EmitContext implements Cloneable,Serializable {
      */
     public enum Type{
         STREAM_ID(String.class)
-        ,TOPIC(String.class) //TODO: This should be pulled from the message 
directly with the new spout when we want to support multiple topics.
         ,PARTITION(Integer.class)
         ,TASK_ID(Integer.class)
         ,UUID(String.class)

http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java
----------------------------------------------------------------------
diff --git 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java
 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java
index 5e9ffff..fdf740c 100644
--- 
a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java
+++ 
b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java
@@ -30,12 +30,19 @@ public class MapVariableResolver implements 
VariableResolver {
     if(variableMappingOne != null) {
       variableMappings.add(variableMappingOne);
     }
-    for(Map m : variableMapping) {
-      if(m != null) {
-        this.variableMappings.add(m);
+    add(variableMapping);
+  }
+
+  public void add(Map... ms) {
+    if(ms != null) {
+      for (Map m : ms) {
+        if (m != null) {
+          this.variableMappings.add(m);
+        }
       }
     }
   }
+
   @Override
   public Object resolve(String variable) {
     for(Map variableMapping : variableMappings) {

Reply via email to