Repository: metron Updated Branches: refs/heads/master 7cd39316a -> dcec5a7cf
METRON-1001: Allow metron to ingest parser metadata along with data closes apache/incubator-metron#621 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/dcec5a7c Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/dcec5a7c Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/dcec5a7c Branch: refs/heads/master Commit: dcec5a7cfb7f8b6544af8ab137dab3f75de9c49a Parents: 7cd3931 Author: cstella <[email protected]> Authored: Tue Jul 11 18:25:37 2017 +0100 Committer: cstella <[email protected]> Committed: Tue Jul 11 18:25:37 2017 +0100 ---------------------------------------------------------------------- .../rest/service/impl/StellarServiceImpl.java | 2 +- .../org/apache/metron/common/Constants.java | 1 + .../common/configuration/FieldTransformer.java | 10 ++-- .../configuration/SensorParserConfig.java | 26 +++++++++ .../transformation/FieldTransformation.java | 2 +- .../transformation/RemoveTransformation.java | 2 +- .../SimpleFieldTransformation.java | 2 +- .../transformation/StellarTransformation.java | 5 +- .../transformation/FieldTransformationTest.java | 9 ++- .../RemoveTransformationTest.java | 8 +-- .../StellarTransformationTest.java | 18 +++--- .../management/ParserConfigFunctionsTest.java | 2 +- metron-platform/metron-parsers/README.md | 53 ++++++++++++++++- .../apache/metron/parsers/bolt/ParserBolt.java | 60 +++++++++++++++++--- .../parsers/topology/ParserTopologyBuilder.java | 9 ++- .../metron/spout/pcap/HDFSWriterCallback.java | 15 +++-- .../kafka/flux/SimpleStormKafkaBuilder.java | 54 ++++++++++++++---- .../storm/kafka/flux/StormKafkaSpout.java | 11 +++- .../apache/storm/kafka/CallbackKafkaSpout.java | 4 +- .../org/apache/storm/kafka/EmitContext.java | 1 - .../metron/stellar/dsl/MapVariableResolver.java | 13 ++++- 21 files changed, 242 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java index 1f9af3f..715bd37 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StellarServiceImpl.java @@ -72,7 +72,7 @@ public class StellarServiceImpl implements StellarService { public Map<String, Object> applyTransformations(SensorParserContext sensorParserContext) { JSONObject sampleJson = new JSONObject(sensorParserContext.getSampleData()); sensorParserContext.getSensorParserConfig().getFieldTransformations().forEach(fieldTransformer -> { - fieldTransformer.transformAndUpdate(sampleJson, sensorParserContext.getSensorParserConfig().getParserConfig(), Context.EMPTY_CONTEXT()); + fieldTransformer.transformAndUpdate(sampleJson, Context.EMPTY_CONTEXT(), sensorParserContext.getSensorParserConfig().getParserConfig()); } ); return sampleJson; http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java index c2ede49..8b7e478 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java @@ -24,6 +24,7 @@ import java.util.Map; public class Constants { + public static final String METADATA_PREFIX = "metron.metadata."; public static final String ZOOKEEPER_ROOT = "/metron"; public static final String ZOOKEEPER_TOPOLOGY_ROOT = ZOOKEEPER_ROOT + "/topology"; public static final long DEFAULT_CONFIGURED_BOLT_TIMEOUT = 5000; http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java index b104bba..df80691 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java @@ -103,21 +103,21 @@ public class FieldTransformer implements Serializable { } } - public Map<String, Object> transform(JSONObject input, Map<String, Object> sensorConfig, Context context) { + public Map<String, Object> transform(JSONObject input, Context context, Map<String, Object>... sensorConfig) { if(getInput() == null || getInput().isEmpty()) { - return transformation.map(input, getOutput(), config, sensorConfig, context); + return transformation.map(input, getOutput(), config, context, sensorConfig); } else { Map<String, Object> in = new HashMap<>(); for(String inputField : getInput()) { in.put(inputField, input.get(inputField)); } - return transformation.map(in, getOutput(), config, sensorConfig, context); + return transformation.map(in, getOutput(), config, context, sensorConfig); } } - public void transformAndUpdate(JSONObject message, Map<String, Object> sensorConfig, Context context) { - Map<String, Object> currentValue = transform(message, sensorConfig, context); + public void transformAndUpdate(JSONObject message, Context context, Map<String, Object>... sensorConfig) { + Map<String, Object> currentValue = transform(message, context, sensorConfig); if(currentValue != null) { for(Map.Entry<String, Object> kv : currentValue.entrySet()) { if(kv.getValue() == null) { http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java index e49c99b..d72f462 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java @@ -35,6 +35,24 @@ public class SensorParserConfig implements Serializable { private String writerClassName; private String errorWriterClassName; private String invalidWriterClassName; + private Boolean readMetadata = false; + private Boolean mergeMetadata = false; + + public Boolean mergeMetadata() { + return mergeMetadata; + } + + public void setMergeMetadata(Boolean mergeMetadata) { + this.mergeMetadata = mergeMetadata; + } + + public Boolean readMetadata() { + return readMetadata; + } + + public void setReadMetadata(Boolean readMetadata) { + this.readMetadata = readMetadata; + } public String getErrorWriterClassName() { return errorWriterClassName; @@ -129,6 +147,8 @@ public class SensorParserConfig implements Serializable { ", invalidWriterClassName='" + invalidWriterClassName + '\'' + ", parserConfig=" + parserConfig + ", fieldTransformations=" + fieldTransformations + + ", readMetadata=" + readMetadata + + ", mergeMetadata=" + mergeMetadata + '}'; } @@ -153,6 +173,10 @@ public class SensorParserConfig implements Serializable { return false; if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null) return false; + if (readMetadata() != null ? !readMetadata().equals(that.readMetadata()) : that.readMetadata() != null) + return false; + if (mergeMetadata() != null ? !mergeMetadata().equals(that.mergeMetadata()) : that.mergeMetadata() != null) + return false; return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null; } @@ -167,6 +191,8 @@ public class SensorParserConfig implements Serializable { result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0); result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0); result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0); + result = 31 * result + (readMetadata() != null ? readMetadata().hashCode() : 0); + result = 31 * result + (mergeMetadata() != null ? mergeMetadata().hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java index d75df55..138e228 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformation.java @@ -29,7 +29,7 @@ public interface FieldTransformation extends Serializable { Map<String, Object> map( Map<String, Object> input , List<String> outputField , LinkedHashMap<String, Object> fieldMappingConfig - , Map<String, Object> sensorConfig , Context context + , Map<String, Object>... sensorConfig ); } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java index a94ccd8..1c875b4 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RemoveTransformation.java @@ -68,8 +68,8 @@ public class RemoveTransformation implements FieldTransformation { public Map<String, Object> map( Map<String, Object> input , final List<String> outputFields , LinkedHashMap<String, Object> fieldMappingConfig - , Map<String, Object> sensorConfig , Context context + , Map<String, Object>... sensorConfig ) { String condition = getCondition(fieldMappingConfig); StellarPredicateProcessor processor = getPredicateProcessor(condition); http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java index 252e5e5..9ccbdc7 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/SimpleFieldTransformation.java @@ -30,8 +30,8 @@ public abstract class SimpleFieldTransformation implements FieldTransformation { public Map<String, Object> map (Map<String, Object> input , List<String> outputField , LinkedHashMap<String, Object> fieldMappingConfig - , Map<String, Object> sensorConfig , Context context + , Map<String, Object>... sensorConfig ) { Object value = (input == null) http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java index 54c7236..a5d8689 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java @@ -31,14 +31,15 @@ public class StellarTransformation implements FieldTransformation { public Map<String, Object> map( Map<String, Object> input , List<String> outputField , LinkedHashMap<String, Object> fieldMappingConfig - , Map<String, Object> sensorConfig , Context context + , Map<String, Object>... sensorConfig ) { Map<String, Object> ret = new HashMap<>(); Map<String, Object> intermediateVariables = new HashMap<>(); Set<String> outputs = new HashSet<>(outputField); - VariableResolver resolver = new MapVariableResolver(ret, intermediateVariables, input, sensorConfig); + MapVariableResolver resolver = new MapVariableResolver(ret, intermediateVariables, input); + resolver.add(sensorConfig); StellarProcessor processor = new StellarProcessor(); for(Map.Entry<String, Object> kv : fieldMappingConfig.entrySet()) { String oField = kv.getKey(); http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java index 57de964..71a0298 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java @@ -43,8 +43,8 @@ public class FieldTransformationTest { public Map<String, Object> map( Map<String, Object> input , List<String> outputField , LinkedHashMap<String, Object> fieldMappingConfig - , Map<String, Object> sensorConfig , Context context + , Map<String, Object>... sensorConfig ) { return ImmutableMap.of(outputField.get(0), Joiner.on(fieldMappingConfig.get("delim").toString()).join(input.entrySet())); @@ -134,8 +134,8 @@ public class FieldTransformationTest { ,"field2", "value2" ) ) - , c.getParserConfig() , Context.EMPTY_CONTEXT() + , c.getParserConfig() ) ); } @@ -146,7 +146,10 @@ public class FieldTransformationTest { Assert.assertNotNull(handler); Assert.assertEquals(ImmutableMap.of("protocol", "TCP") - ,handler.transform(new JSONObject(ImmutableMap.of("protocol", 6)), c.getParserConfig(), Context.EMPTY_CONTEXT()) + ,handler.transform(new JSONObject(ImmutableMap.of("protocol", 6)) + , Context.EMPTY_CONTEXT() + , c.getParserConfig() + ) ); } } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java index 72f7480..a8e6d71 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RemoveTransformationTest.java @@ -51,7 +51,7 @@ public class RemoveTransformationTest { JSONObject input = new JSONObject(new HashMap<String, Object>() {{ put("field1", "foo"); }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); Assert.assertFalse(input.containsKey("field1")); } @@ -78,7 +78,7 @@ public class RemoveTransformationTest { JSONObject input = new JSONObject(new HashMap<String, Object>() {{ put("field1", "foo"); }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); //no removal happened because field2 does not exist Assert.assertTrue(input.containsKey("field1")); Assert.assertFalse(input.containsKey("field2")); @@ -88,7 +88,7 @@ public class RemoveTransformationTest { put("field1", "foo"); put("field2", "bar"); }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); //no removal happened because field2 != bar Assert.assertTrue(input.containsKey("field1")); Assert.assertTrue(input.containsKey("field2")); @@ -99,7 +99,7 @@ public class RemoveTransformationTest { put("field2", "foo"); }}); //removal of field1 happens because field2 exists and is 'bar' - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); Assert.assertFalse(input.containsKey("field1")); Assert.assertTrue(input.containsKey("field2")); } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java index 01008ca..12f8b5c 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java @@ -73,7 +73,7 @@ public class StellarTransformationTest { SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(configNumericDomain)); FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null); JSONObject input = new JSONObject(); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); Assert.assertTrue(input.containsKey("full_hostname")); Assert.assertEquals("1234567890123456789012345678901234567890123456789012345678901234567890", input.get("full_hostname")); Assert.assertFalse(input.containsKey("domain_without_subdomains")); @@ -87,7 +87,7 @@ public class StellarTransformationTest { FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null); JSONObject input = new JSONObject(); try { - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); } catch(IllegalStateException ex) { Assert.assertTrue(ex.getMessage().contains("URL_TO_HOST")); @@ -138,7 +138,7 @@ public class StellarTransformationTest { FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null); JSONObject input = new JSONObject(new HashMap<String, Object>() {{ }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); int expected = 3; Assert.assertEquals(expected, input.get("final_value")); Assert.assertFalse(input.containsKey("value1")); @@ -171,7 +171,7 @@ public class StellarTransformationTest { JSONObject input = new JSONObject(new HashMap<String, Object>() {{ put("timestamp", "2016-01-05 17:02:30"); }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); long expected = 1452013350000L; Assert.assertEquals(expected, input.get("utc_timestamp")); Assert.assertTrue(input.containsKey("timestamp")); @@ -190,7 +190,7 @@ public class StellarTransformationTest { JSONObject input = new JSONObject(new HashMap<String, Object>() {{ put("timestamp", "2016-01-05 17:02:30"); }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); long expected = 1452013350000L; Assert.assertEquals(expected, input.get("utc_timestamp")); Assert.assertTrue(input.containsKey("timestamp")); @@ -209,7 +209,7 @@ public class StellarTransformationTest { //no input fields => no transformation JSONObject input = new JSONObject(new HashMap<String, Object>() {{ }}); - handler.transformAndUpdate(input, new HashMap<>(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); Assert.assertFalse(input.containsKey("utc_timestamp")); Assert.assertTrue(input.isEmpty()); } @@ -260,7 +260,7 @@ public class StellarTransformationTest { //looking up the data center in portland, which doesn't exist in the map, so we default to UTC put("dc", "portland"); }}); - handler.transformAndUpdate(input, c.getParserConfig(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT()); long expected = 1452013350000L; Assert.assertEquals(expected, input.get("utc_timestamp")); Assert.assertEquals("caseystella.com", input.get("url_host")); @@ -275,7 +275,7 @@ public class StellarTransformationTest { put("url", "https://caseystella.com/blog"); put("dc", "london"); }}); - handler.transformAndUpdate(input, c.getParserConfig(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT(), c.getParserConfig()); long expected = 1452013350000L; Assert.assertEquals(expected, input.get("utc_timestamp")); Assert.assertEquals("caseystella.com", input.get("url_host")); @@ -289,7 +289,7 @@ public class StellarTransformationTest { put("timestamp", "2016-01-05 17:02:30"); put("url", "https://caseystella.com/blog"); }}); - handler.transformAndUpdate(input, c.getParserConfig(), Context.EMPTY_CONTEXT()); + handler.transformAndUpdate(input, Context.EMPTY_CONTEXT(), c.getParserConfig()); long expected = 1452013350000L; Assert.assertEquals(expected, input.get("utc_timestamp")); Assert.assertEquals("caseystella.com", input.get("url_host")); http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java index 0587000..1f59ae0 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserConfigFunctionsTest.java @@ -64,7 +64,7 @@ public class ParserConfigFunctionsTest { sensorParserConfig.init(); for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) { if (handler != null) { - handler.transformAndUpdate(ret, sensorParserConfig.getParserConfig(), context); + handler.transformAndUpdate(ret, context, sensorParserConfig.getParserConfig()); } } return ret; http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-parsers/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md index e5d97e0..f03abdf 100644 --- a/metron-platform/metron-parsers/README.md +++ b/metron-platform/metron-parsers/README.md @@ -20,8 +20,14 @@ There are two general types types of parsers: * `timestampFormat` : The date format of the timestamp to use. If unspecified, the parser assumes the timestamp is ms since unix epoch. * `columns` : A map of column names you wish to extract from the CSV to their offsets (e.g. `{ 'name' : 1, 'profession' : 3}` would be a column map for extracting the 2nd and 4th columns from a CSV) * `separator` : The column separator, `,` by default. -just - + * JSON Map Parser: `org.apache.metron.parsers.json.JSONMapParser` with possible `parserConfig` entries of + * `mapStrategy` : A strategy to indicate how to handle multi-dimensional Maps. This is one of + * `DROP` : Drop fields which contain maps + * `UNFOLD` : Unfold inner maps. So `{ "foo" : { "bar" : 1} }` would turn into `{"foo.bar" : 1}` + * `ALLOW` : Allow multidimensional maps + * `ERROR` : Throw an error when a multidimensional map is encountered + * A field called `timestamp` is expected to exist and, if it does not, then current time is inserted. + ## Parser Architecture  @@ -91,7 +97,10 @@ Example Stellar Filter which includes messages which contain a the `field1` fiel } } ``` -* `sensorTopic` : The kafka topic to send the parsed messages to. +* `sensorTopic` : The kafka topic to send the parsed messages to. If the topic is prefixed and suffixed by `/` +then it is assumed to be a regex and will match any topic matching the pattern (e.g. `/bro.*/` would match `bro_cust0`, `bro_cust1` and `bro_cust2`) +* `readMetadata` : Boolean indicating whether to read metadata or not (`false` by default). See below for a discussion about metadata. +* `mergeMetadata` : Boolean indicating whether to merge metadata with the message or not (`false` by default). See below for a discussion about metadata. * `parserConfig` : A JSON Map representing the parser implementation specific configuration. * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic. @@ -101,6 +110,44 @@ transformation which can be done to a message. This transformation can * Add new fields given the values of existing fields of a message * Remove existing fields of a message +### Metadata + +Metadata is a useful thing to send to Metron and use during enrichment or threat intelligence. +Consider the following scenarios: +* You have multiple telemetry sources of the same type that you want to + * ensure downstream analysts can differentiate + * ensure profiles consider independently as they have different seasonality or some other fundamental characteristic + +As such, there are two types of metadata that we seek to support in Metron: +* Environmental metadata : Metadata about the system at large + * Consider the possibility that you have multiple kafka topics being processed by one parser and you want to tag the messages with the kafka topic + * At the moment, only the kafka topic is kept as the field name. +* Custom metadata: Custom metadata from an individual telemetry source that one might want to use within Metron. + +Metadata is controlled by two fields in the parser: +* `readMetadata` : This is a boolean indicating whether metadata will be read and made available to Field +transformations (i.e. Stellar field transformations). The default is `false`. +* `mergeMetadata` : This is a boolean indicating whether metadata fields will be merged with the message automatically. +That is to say, if this property is set to `true` then every metadata field will become part of the messages and, +consequently, also available for use in field transformations. +#### Field Naming + +In order to avoid collisions from metadata fields, metadata fields will be prefixed with `metron.metadata.`. +So, for instance the kafka topic would be in the field `metron.metadata.topic`. + +#### Specifying Custom Metadata +Custom metadata is specified by sending a JSON Map in the key. If no key is sent, then, obviously, no metadata will be parsed. +For instance, sending a metadata field called `customer_id` could be done by sending +``` +{ +"customer_id" : "my_customer_id" +} +``` +in the kafka key. This would be exposed as the field `metron.metadata.customer_id` to stellar field transformations +as well, if `mergeMetadata` is `true`, available as a field in its own right. + + + ### `fieldTransformation` configuration The format of a `fieldTransformation` is as follows: http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java index 6fe2ff0..d78487b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java @@ -17,6 +17,7 @@ */ package org.apache.metron.parsers.bolt; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.lang3.StringUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.bolt.ConfiguredParserBolt; @@ -29,6 +30,7 @@ import org.apache.metron.common.error.MetronError; import org.apache.metron.common.message.MessageGetStrategy; import org.apache.metron.common.message.MessageGetters; import org.apache.metron.common.utils.ErrorUtils; +import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.parsers.filters.Filters; import org.apache.metron.parsers.interfaces.MessageFilter; import org.apache.metron.parsers.interfaces.MessageParser; @@ -41,18 +43,16 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; +import static org.apache.metron.common.Constants.METADATA_PREFIX; + public class ParserBolt extends ConfiguredParserBolt implements Serializable { + private static final int KEY_INDEX = 1; private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class); private OutputCollector collector; private MessageParser<JSONObject> parser; @@ -121,6 +121,39 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { StellarFunctions.initialize(stellarContext); } + private Map<String, Object> getMetadata(Tuple t, boolean readMetadata) { + Map<String, Object> ret = new HashMap<>(); + if(!readMetadata) { + return ret; + } + Fields tupleFields = t.getFields(); + for(int i = 2;i < tupleFields.size();++i) { + String envMetadataFieldName = tupleFields.get(i); + Object envMetadataFieldValue = t.getValue(i); + if(!StringUtils.isEmpty(envMetadataFieldName) && envMetadataFieldValue != null) { + ret.put(METADATA_PREFIX + envMetadataFieldName, envMetadataFieldValue); + } + } + byte[] keyObj = t.getBinary(KEY_INDEX); + String keyStr = null; + try { + keyStr = keyObj == null?null:new String(keyObj); + if(!StringUtils.isEmpty(keyStr)) { + Map<String, Object> metadata = JSONUtils.INSTANCE.load(keyStr, new TypeReference<Map<String, Object>>() { + }); + for(Map.Entry<String, Object> kv : metadata.entrySet()) { + ret.put(METADATA_PREFIX + kv.getKey(), kv.getValue()); + } + + } + } catch (IOException e) { + String reason = "Unable to parse metadata; expected JSON Map: " + (keyStr == null?"NON-STRING!":keyStr); + LOG.error(reason, e); + throw new IllegalStateException(reason, e); + } + return ret; + } + @SuppressWarnings("unchecked") @Override public void execute(Tuple tuple) { @@ -132,18 +165,29 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable { boolean ackTuple = !writer.handleAck(); int numWritten = 0; if(sensorParserConfig != null) { + Map<String, Object> metadata = getMetadata(tuple, sensorParserConfig.readMetadata()); List<FieldValidator> fieldValidations = getConfigurations().getFieldValidations(); Optional<List<JSONObject>> messages = parser.parseOptional(originalMessage); for (JSONObject message : messages.orElse(Collections.emptyList())) { message.put(Constants.SENSOR_TYPE, getSensorType()); + if(sensorParserConfig.mergeMetadata()) { + message.putAll(metadata); + } for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) { if (handler != null) { - handler.transformAndUpdate(message, sensorParserConfig.getParserConfig(), stellarContext); + if(!sensorParserConfig.mergeMetadata()) { + //if we haven't merged metadata, then we need to pass them along as configuration params. + handler.transformAndUpdate(message, stellarContext, sensorParserConfig.getParserConfig(), metadata); + } + else { + handler.transformAndUpdate(message, stellarContext, sensorParserConfig.getParserConfig()); + } } } if(!message.containsKey(Constants.GUID)) { message.put(Constants.GUID, UUID.randomUUID().toString()); } + if (parser.validate(message) && (filter == null || filter.emitTuple(message, stellarContext))) { numWritten++; List<FieldValidator> failedValidators = getFailedValidators(message, fieldValidations); http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index 0c88573..feac80b 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -132,7 +132,14 @@ public class ParserTopologyBuilder { if(securityProtocol.isPresent()) { kafkaSpoutConfigOptions.putIfAbsent("security.protocol", securityProtocol.get()); } - return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, Arrays.asList("value"), kafkaSpoutConfigOptions); + return SimpleStormKafkaBuilder.create( inputTopic + , zkQuorum + , Arrays.asList( SimpleStormKafkaBuilder.FieldsConfiguration.VALUE.getFieldName() + , SimpleStormKafkaBuilder.FieldsConfiguration.KEY.getFieldName() + , SimpleStormKafkaBuilder.FieldsConfiguration.TOPIC.getFieldName() + ) + , kafkaSpoutConfigOptions + ); } private static KafkaWriter createKafkaWriter( Optional<String> broker http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java index a6823e6..21d5110 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java @@ -22,6 +22,7 @@ import com.google.common.base.Joiner; import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer; import org.apache.storm.kafka.Callback; import org.apache.storm.kafka.EmitContext; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,12 +159,18 @@ public class HDFSWriterCallback implements Callback { @Override public void initialize(EmitContext context) { this.context = context; - Object topics = context.get(EmitContext.Type.TOPIC); - if(topics instanceof List) { - this.topic = Joiner.on(",").join((List<String>)topics); + KafkaSpoutConfig spoutConfig = context.get(EmitContext.Type.SPOUT_CONFIG); + if(spoutConfig != null && spoutConfig.getSubscription() != null) { + this.topic = spoutConfig.getSubscription().getTopicsString(); + if(this.topic.length() > 0) { + int len = this.topic.length(); + if(this.topic.charAt(0) == '[' && this.topic.charAt(len - 1) == ']') { + this.topic = this.topic.substring(1, len - 1); + } + } } else { - this.topic = "" + topics; + throw new IllegalStateException("Unable to initialize, because spout config is not correctly specified"); } } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java index 592859e..4db1302 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -19,6 +19,7 @@ package org.apache.metron.storm.kafka.flux; import com.google.common.base.Joiner; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -32,8 +33,10 @@ import org.apache.storm.topology.OutputFieldsGetter; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import java.io.Serializable; import java.util.*; import java.util.function.Function; +import java.util.regex.Pattern; /** * This is a convenience layer on top of the KafkaSpoutConfig.Builder available in storm-kafka-client. @@ -64,6 +67,10 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V this.fieldName = fieldName; } + public String getFieldName() { + return fieldName; + } + /** * Return a list of the enums * @param configs @@ -147,12 +154,11 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V public static String DEFAULT_DESERIALIZER = ByteArrayDeserializer.class.getName(); - private String topic; /** * Create an object with the specified properties. This will expose fields "key" and "value." * @param kafkaProps The special kafka properties - * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns. + * @param topic The kafka topic. * @param zkQuorum The zookeeper quorum. We will use this to pull the brokers from this. */ public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps @@ -176,16 +182,48 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V , List<String> fieldsConfiguration ) { + this(kafkaProps, toSubscription(topic), zkQuorum, fieldsConfiguration); + } + + /** + * Create an object with the specified properties and exposing the specified fields. + * @param kafkaProps The special kafka properties + * @param subscription The subscription to the kafka topic(s) + * @param zkQuorum The zookeeper quorum. We will use this to pull the brokers from this. + * @param fieldsConfiguration The fields to expose in the storm tuple emitted. + */ + public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps + , Subscription subscription + , String zkQuorum + , List<String> fieldsConfiguration + ) + { super( getBootstrapServers(zkQuorum, kafkaProps) , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)), DEFAULT_DESERIALIZER) - , topic + , subscription ); setProp(kafkaProps); setRecordTranslator(new SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration))); - this.topic = topic; } + + private static Subscription toSubscription(String topicOrSubscription) { + if (StringUtils.isEmpty(topicOrSubscription)) { + throw new IllegalArgumentException("Topic name is invalid (empty or null): " + topicOrSubscription); + } + int length = topicOrSubscription.length(); + if(topicOrSubscription.charAt(0) == '/' && topicOrSubscription.charAt(length - 1) == '/') { + //pattern, so strip off the preceding and ending slashes + String substr = topicOrSubscription.substring(1, length - 1); + return new PatternSubscription(Pattern.compile(substr)); + } + else { + return new NamedSubscription(topicOrSubscription); + } + } + + private static <T> Class<Deserializer<T>> createDeserializer( Optional<String> deserializerClass , String defaultDeserializerClass ) @@ -210,14 +248,6 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V } /** - * Get the kafka topic. TODO: In the future, support multiple topics and regex patterns. - * @return - */ - public String getTopic() { - return topic; - } - - /** * Create a StormKafkaSpout from a given topic, zookeeper quorum and fields. Also, configure the spout * using a Map that configures both kafka as well as the spout (see the properties in SpoutConfiguration). * @param topic http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java index 514a21d..8c66a92 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java @@ -35,13 +35,10 @@ import org.apache.storm.kafka.spout.KafkaSpoutConfig; public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> { private static final Logger LOG = Logger.getLogger(StormKafkaSpout.class); protected KafkaSpoutConfig<K,V> _spoutConfig; - protected String _topic; - protected AtomicBoolean isShutdown = new AtomicBoolean(false); public StormKafkaSpout(SimpleStormKafkaBuilder<K,V> builder) { super(builder.build()); - this._topic = builder.getTopic(); this._spoutConfig = builder.build(); } @@ -71,5 +68,13 @@ public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> { //see https://issues.apache.org/jira/browse/STORM-2184 LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we); } + catch(IllegalStateException ise) { + if(ise.getMessage().contains("This consumer has already been closed")) { + LOG.warn(ise.getMessage()); + } + else { + throw ise; + } + } } } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java index 8592e13..acd7c5f 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java @@ -49,8 +49,8 @@ public class CallbackKafkaSpout<K, V> extends StormKafkaSpout<K, V> { public void initialize(TopologyContext context) { _callback = createCallback(callbackClazz); _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig) - .with(EmitContext.Type.UUID, context.getStormId()) - .with(EmitContext.Type.TOPIC, _topic); + .with(EmitContext.Type.UUID, context.getStormId()) + ; _callback.initialize(_context); } http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java index eac3ea8..b53cd9d 100644 --- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java @@ -35,7 +35,6 @@ public class EmitContext implements Cloneable,Serializable { */ public enum Type{ STREAM_ID(String.class) - ,TOPIC(String.class) //TODO: This should be pulled from the message directly with the new spout when we want to support multiple topics. ,PARTITION(Integer.class) ,TASK_ID(Integer.class) ,UUID(String.class) http://git-wip-us.apache.org/repos/asf/metron/blob/dcec5a7c/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java index 5e9ffff..fdf740c 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/MapVariableResolver.java @@ -30,12 +30,19 @@ public class MapVariableResolver implements VariableResolver { if(variableMappingOne != null) { variableMappings.add(variableMappingOne); } - for(Map m : variableMapping) { - if(m != null) { - this.variableMappings.add(m); + add(variableMapping); + } + + public void add(Map... ms) { + if(ms != null) { + for (Map m : ms) { + if (m != null) { + this.variableMappings.add(m); + } } } } + @Override public Object resolve(String variable) { for(Map variableMapping : variableMappings) {
