GitHub user nickwallen opened a pull request: https://github.com/apache/incubator-metron/pull/449
METRON-701 Triage Metrics Produced by the Profiler ## [METRON-701](https://issues.apache.org/jira/browse/METRON-701) Please do not merge. I am looking for feedback on this pull request. I will comment on the specific area of code below. ### Problem The motivating example is that I would like to create an alert if the number of inbound flows to any host over a 15 minute interval is abnormal. The value being interrogated here, the number of inbound flows, is not a static value contained within any single telemetry message. This value is calculated across multiple messages by the Profiler. The current Threat Triage process cannot be used to interrogate values calculated by the Profiler. ### Proposed Solution I am proposing that we treat the Profiler as a source of telemetry. The measurements captured by the Profiler would be enqueued into a Kafka topic. We would then treat those Profiler messages like any other telemetry. We would parse, enrich, triage, and index those messages. ### Testing * If you are testing in the "Quick Dev" environment, increase the number of slots available to at least 5. This can be done within Ambari by editing Storm's `supervisor.slots.ports` property. ``` supervisor.slots.ports = [6700, 6701, 6702, 6703, 6704] ``` * Alter the Profiler topology settings at `config/profiler.properties` to use a 1 minute period duration. This is not necessary, but is useful for the impatient. ``` profiler.period.duration=1 profiler.period.duration.units=MINUTES ``` * Start only the Bro, Enrichment, and Profiler topologies. Stop other unnecessary sevices like Elasticsearch and Kibana. ``` service sensor-stubs start bro $METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s bro $METRON_HOME/bin/start_enrichment_topology.sh $METRON_HOME/bin/start_profiler_topology.sh ``` * Create a Profile that simply counts messages. ``` { "profiles": [ { "profile": "test", "foreach": "'global'", "onlyif": "source.type == 'bro'", "init": { "count": "0" }, "update": { "count": "count + 1" }, "result": "count" } ] } ``` * Look for messages to hit the 'profiles' topic in Kafka. ``` $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic profiles --from-beginning {"profileName":"test","period":{"period":24776383,"startTimeMillis":1486582980000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":156,"entity":"global"} {"profileName":"test","period":{"period":24776384,"startTimeMillis":1486583040000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":158,"entity":"global"} {"profileName":"test","period":{"period":24776385,"startTimeMillis":1486583100000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":152,"entity":"global"} {"profileName":"test","period":{"period":24776386,"startTimeMillis":1486583160000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":158,"entity":"global"} {"profileName":"test","period":{"period":24776387,"startTimeMillis":1486583220000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":167,"entity":"global"} {"profileName":"test","period":{"period":24776388,"startTimeMillis":1486583280000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":144,"entity":"global"} ``` * Ensure that the Profiler continues to write to HBase. ``` $ bin/stellar -z node1:2181 Stellar, Go! Please note that functions are loading lazily in the background and will be unavailable until loaded fully. {es.clustername=metron, es.ip=node1, es.port=9300, es.date.format=yyyy.MM.dd.HH, bootstrap.servers=node1:6667, profiler.client.period.duration=1, profiler.client.period.duration.units=MINUTES} [Stellar]>>> [Stellar]>>> PROFILE_GET("test", "global", 5, "MINUTES") [146, 147, 142, 120, 151] ``` * Start a parser to consume the profile messages. ``` /usr/metron/0.3.0/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s profiles ``` * The parser should be adding `source.type=profiles` and `is_alert=true`. ``` $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic enrichments --from-beginning | grep profileName {"profileName":"test","original_string":"{\"profileName\":\"test\",\"period\":{\"period\":24776474,\"startTimeMillis\":1486588440000,\"durationMillis\":60000},\"groups\":[],\"definition\":{\"result\":\"count\",\"foreach\":\"'global'\",\"init\":{\"count\":\"0\"},\"onlyif\":\"source.type == 'bro'\",\"profile\":\"test\",\"destination\":[\"hbase\",\"kafka\"],\"update\":{\"count\":\"count + 1\"},\"groupBy\":[]},\"value\":156,\"entity\":\"global\"}","groups":[],"is_alert":true,"value":156,"entity":"global","timestamp":1486588475670,"source.type":"profiles"} ``` * Add a threat triage rule that applies to the measurements coming out of the Profiler. Run the following in a Stellar REPL. ``` # fetch the current config conf := CONFIG_GET("ENRICHMENT", "profiles") # create a rule rule := profileName == "test" and value > 100 triage := { "name":"Test Rule", "rule": SHELL_GET_EXPRESSION('rule'), "score":"10" } conf := THREAT_TRIAGE_ADD(conf, [triage]) # save the configuration CONFIG_PUT("ENRICHMENT", conf, "profiles") ``` * Ensure that the threat triage rule is being applied correctly. Look for a key named `threat.triage.level` when `value > 100`. ``` $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic indexing | grep profileName {"profileName":"test","enrichmentsplitterbolt.splitter.end.ts":"1486589135689","enrichmentsplitterbolt.splitter.begin.ts":"1486589135689","groups":[],"is_alert":"true","source.type":"profiles","original_string":"{\"profileName\":\"test\",\"period\":{\"period\":24776485,\"startTimeMillis\":1486589100000,\"durationMillis\":60000},\"groups\":[],\"definition\":{\"result\":\"count\",\"foreach\":\"'global'\",\"init\":{\"count\":\"0\"},\"onlyif\":\"source.type == 'bro'\",\"profile\":\"test\",\"destination\":[\"hbase\",\"kafka\"],\"update\":{\"count\":\"count + 1\"},\"groupBy\":[]},\"value\":160,\"entity\":\"global\"}","threatintelsplitterbolt.splitter.end.ts":"1486589135694","threat.triage.level":10.0,"threatinteljoinbolt.joiner.ts":"1486589135697","enrichmentjoinbolt.joiner.ts":"1486589135692","threatintelsplitterbolt.splitter.begin.ts":"1486589135694","value":160,"entity":"global","timestamp":1486589135684} ``` These steps have shown how metrics produced by the Profiler can be triaged using the existing Threat Triage mechanism in Metron. You can merge this pull request into a Git repository by running: $ git pull https://github.com/nickwallen/incubator-metron METRON-701 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-metron/pull/449.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #449 ---- commit 88d355a7e9168575193fd105c331dbf67351e0a8 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-07T19:48:18Z METRON-701 Bulk writer expects a field named 'message' that is a JSONObject commit e4e7e54c8332f9838f36ff2871fc61e9db6c661d Author: Nick Allen <n...@nickallen.org> Date: 2017-02-07T20:27:58Z METRON-701 Embedded the profile definition in the profile measurement itself. Also added a hack because the KafkaBolt requires a JSONObject commit fde7dfd56e2d555b4658c0abdd7f233aa37817b2 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-08T14:19:20Z METRON-701 Moved handling of default value of 'onlyif' to the ProfileConfig class commit aa52e98327efaead5d9a2169a2d4e39be8889dd4 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-08T15:50:13Z METRON-701 kafka writer expects a JSONObject commit c04d8a326b302f207e6fef2842cad4e4f8f45bae Author: Nick Allen <n...@nickallen.org> Date: 2017-02-08T17:04:08Z METRON-701 Introduced the DestinationHandler to simplify the differences between downstream destinations like Kafka and HBase commit f46a5d196f9ee0d0cbea70b36caf60fe66716122 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-08T17:04:27Z METRON-701 The message should not be used in field grouping commit a81d3120d4a7832bec879ec3f861670ab49e5aab Author: Nick Allen <n...@nickallen.org> Date: 2017-02-08T21:36:13Z METRON-701 Properly serializing the ProfileMeasurement. Added documentation for new options commit 6593481840d2348b7c86630f8d50cf755fa2b6f5 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-08T21:57:19Z METRON-701 Fixed up integration test commit c4744aadd224917c88dc409882024b27754b6836 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T14:11:59Z METRON-701 Added check for invalid destinations commit fe136d8adc0e6b6b67bc12e259088b76c5d6eecc Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T14:15:22Z METRON-701 Fixed exception message commit 6b221df9fe206f225953df428e0e28fba3541aab Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T14:22:18Z METRON-701 Allowing the streamId to be set, just in case. commit 152b75c3e38e1fde82c004209cce7d81a6529629 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T17:11:15Z METRON-701 Rename ttl commit 8cc8008e208cc12a2399443ca6f8abe949d952e1 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T19:07:38Z Added README example 4 to integration tests. Added missing license headers commit e766c4806033387d50b1992a5a0dff9814b19370 Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T20:07:40Z METRON-701 Fixed dependencies list commit 5d329e64d89ed747d727a6cc2b8a1489f4db2dde Author: Nick Allen <n...@nickallen.org> Date: 2017-02-09T20:28:37Z METRON-701 Fix up comments ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---