[
https://issues.apache.org/jira/browse/METRON-701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15860151#comment-15860151
]
ASF GitHub Bot commented on METRON-701:
---------------------------------------
GitHub user nickwallen opened a pull request:
https://github.com/apache/incubator-metron/pull/449
METRON-701 Triage Metrics Produced by the Profiler
## [METRON-701](https://issues.apache.org/jira/browse/METRON-701)
Please do not merge. I am looking for feedback on this pull request. I
will comment on the specific area of code below.
### Problem
The motivating example is that I would like to create an alert if the
number of inbound flows to any host over a 15 minute interval is abnormal.
The value being interrogated here, the number of inbound flows, is not a
static value contained within any single telemetry message. This value is
calculated across multiple messages by the Profiler. The current Threat Triage
process cannot be used to interrogate values calculated by the Profiler.
### Proposed Solution
I am proposing that we treat the Profiler as a source of telemetry. The
measurements captured by the Profiler would be enqueued into a Kafka topic. We
would then treat those Profiler messages like any other telemetry. We would
parse, enrich, triage, and index those messages.
### Testing
* If you are testing in the "Quick Dev" environment, increase the number of
slots available to at least 5. This can be done within Ambari by editing
Storm's `supervisor.slots.ports` property.
```
supervisor.slots.ports = [6700, 6701, 6702, 6703, 6704]
```
* Alter the Profiler topology settings at `config/profiler.properties` to
use a 1 minute period duration. This is not necessary, but is useful for the
impatient.
```
profiler.period.duration=1
profiler.period.duration.units=MINUTES
```
* Start only the Bro, Enrichment, and Profiler topologies. Stop other
unnecessary sevices like Elasticsearch and Kibana.
```
service sensor-stubs start bro
$METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s bro
$METRON_HOME/bin/start_enrichment_topology.sh
$METRON_HOME/bin/start_profiler_topology.sh
```
* Create a Profile that simply counts messages.
```
{
"profiles": [
{
"profile": "test",
"foreach": "'global'",
"onlyif": "source.type == 'bro'",
"init": { "count": "0" },
"update": { "count": "count + 1" },
"result": "count"
}
]
}
```
* Look for messages to hit the 'profiles' topic in Kafka.
```
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper
node1:2181 --topic profiles --from-beginning
{"profileName":"test","period":{"period":24776383,"startTimeMillis":1486582980000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count
+ 1"},"groupBy":[]},"value":156,"entity":"global"}
{"profileName":"test","period":{"period":24776384,"startTimeMillis":1486583040000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count
+ 1"},"groupBy":[]},"value":158,"entity":"global"}
{"profileName":"test","period":{"period":24776385,"startTimeMillis":1486583100000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count
+ 1"},"groupBy":[]},"value":152,"entity":"global"}
{"profileName":"test","period":{"period":24776386,"startTimeMillis":1486583160000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count
+ 1"},"groupBy":[]},"value":158,"entity":"global"}
{"profileName":"test","period":{"period":24776387,"startTimeMillis":1486583220000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count
+ 1"},"groupBy":[]},"value":167,"entity":"global"}
{"profileName":"test","period":{"period":24776388,"startTimeMillis":1486583280000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count
+ 1"},"groupBy":[]},"value":144,"entity":"global"}
```
* Ensure that the Profiler continues to write to HBase.
```
$ bin/stellar -z node1:2181
Stellar, Go!
Please note that functions are loading lazily in the background and will be
unavailable until loaded fully.
{es.clustername=metron, es.ip=node1, es.port=9300,
es.date.format=yyyy.MM.dd.HH, bootstrap.servers=node1:6667,
profiler.client.period.duration=1,
profiler.client.period.duration.units=MINUTES}
[Stellar]>>>
[Stellar]>>> PROFILE_GET("test", "global", 5, "MINUTES")
[146, 147, 142, 120, 151]
```
* Start a parser to consume the profile messages.
```
/usr/metron/0.3.0/bin/start_parser_topology.sh -k node1:6667 -z node1:2181
-s profiles
```
* The parser should be adding `source.type=profiles` and `is_alert=true`.
```
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper
node1:2181 --topic enrichments --from-beginning | grep profileName
{"profileName":"test","original_string":"{\"profileName\":\"test\",\"period\":{\"period\":24776474,\"startTimeMillis\":1486588440000,\"durationMillis\":60000},\"groups\":[],\"definition\":{\"result\":\"count\",\"foreach\":\"'global'\",\"init\":{\"count\":\"0\"},\"onlyif\":\"source.type
==
'bro'\",\"profile\":\"test\",\"destination\":[\"hbase\",\"kafka\"],\"update\":{\"count\":\"count
+
1\"},\"groupBy\":[]},\"value\":156,\"entity\":\"global\"}","groups":[],"is_alert":true,"value":156,"entity":"global","timestamp":1486588475670,"source.type":"profiles"}
```
* Add a threat triage rule that applies to the measurements coming out of
the Profiler. Run the following in a Stellar REPL.
```
# fetch the current config
conf := CONFIG_GET("ENRICHMENT", "profiles")
# create a rule
rule := profileName == "test" and value > 100
triage := { "name":"Test Rule", "rule": SHELL_GET_EXPRESSION('rule'),
"score":"10" }
conf := THREAT_TRIAGE_ADD(conf, [triage])
# save the configuration
CONFIG_PUT("ENRICHMENT", conf, "profiles")
```
* Ensure that the threat triage rule is being applied correctly. Look for
a key named `threat.triage.level` when `value > 100`.
```
$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper
node1:2181 --topic indexing | grep profileName
{"profileName":"test","enrichmentsplitterbolt.splitter.end.ts":"1486589135689","enrichmentsplitterbolt.splitter.begin.ts":"1486589135689","groups":[],"is_alert":"true","source.type":"profiles","original_string":"{\"profileName\":\"test\",\"period\":{\"period\":24776485,\"startTimeMillis\":1486589100000,\"durationMillis\":60000},\"groups\":[],\"definition\":{\"result\":\"count\",\"foreach\":\"'global'\",\"init\":{\"count\":\"0\"},\"onlyif\":\"source.type
==
'bro'\",\"profile\":\"test\",\"destination\":[\"hbase\",\"kafka\"],\"update\":{\"count\":\"count
+
1\"},\"groupBy\":[]},\"value\":160,\"entity\":\"global\"}","threatintelsplitterbolt.splitter.end.ts":"1486589135694","threat.triage.level":10.0,"threatinteljoinbolt.joiner.ts":"1486589135697","enrichmentjoinbolt.joiner.ts":"1486589135692","threatintelsplitterbolt.splitter.begin.ts":"1486589135694","value":160,"entity":"global","timestamp":1486589135684}
```
These steps have shown how metrics produced by the Profiler can be triaged
using the existing Threat Triage mechanism in Metron.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/nickwallen/incubator-metron METRON-701
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-metron/pull/449.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #449
----
commit 88d355a7e9168575193fd105c331dbf67351e0a8
Author: Nick Allen <[email protected]>
Date: 2017-02-07T19:48:18Z
METRON-701 Bulk writer expects a field named 'message' that is a JSONObject
commit e4e7e54c8332f9838f36ff2871fc61e9db6c661d
Author: Nick Allen <[email protected]>
Date: 2017-02-07T20:27:58Z
METRON-701 Embedded the profile definition in the profile measurement
itself. Also added a hack because the KafkaBolt requires a JSONObject
commit fde7dfd56e2d555b4658c0abdd7f233aa37817b2
Author: Nick Allen <[email protected]>
Date: 2017-02-08T14:19:20Z
METRON-701 Moved handling of default value of 'onlyif' to the ProfileConfig
class
commit aa52e98327efaead5d9a2169a2d4e39be8889dd4
Author: Nick Allen <[email protected]>
Date: 2017-02-08T15:50:13Z
METRON-701 kafka writer expects a JSONObject
commit c04d8a326b302f207e6fef2842cad4e4f8f45bae
Author: Nick Allen <[email protected]>
Date: 2017-02-08T17:04:08Z
METRON-701 Introduced the DestinationHandler to simplify the differences
between downstream destinations like Kafka and HBase
commit f46a5d196f9ee0d0cbea70b36caf60fe66716122
Author: Nick Allen <[email protected]>
Date: 2017-02-08T17:04:27Z
METRON-701 The message should not be used in field grouping
commit a81d3120d4a7832bec879ec3f861670ab49e5aab
Author: Nick Allen <[email protected]>
Date: 2017-02-08T21:36:13Z
METRON-701 Properly serializing the ProfileMeasurement. Added
documentation for new options
commit 6593481840d2348b7c86630f8d50cf755fa2b6f5
Author: Nick Allen <[email protected]>
Date: 2017-02-08T21:57:19Z
METRON-701 Fixed up integration test
commit c4744aadd224917c88dc409882024b27754b6836
Author: Nick Allen <[email protected]>
Date: 2017-02-09T14:11:59Z
METRON-701 Added check for invalid destinations
commit fe136d8adc0e6b6b67bc12e259088b76c5d6eecc
Author: Nick Allen <[email protected]>
Date: 2017-02-09T14:15:22Z
METRON-701 Fixed exception message
commit 6b221df9fe206f225953df428e0e28fba3541aab
Author: Nick Allen <[email protected]>
Date: 2017-02-09T14:22:18Z
METRON-701 Allowing the streamId to be set, just in case.
commit 152b75c3e38e1fde82c004209cce7d81a6529629
Author: Nick Allen <[email protected]>
Date: 2017-02-09T17:11:15Z
METRON-701 Rename ttl
commit 8cc8008e208cc12a2399443ca6f8abe949d952e1
Author: Nick Allen <[email protected]>
Date: 2017-02-09T19:07:38Z
Added README example 4 to integration tests. Added missing license headers
commit e766c4806033387d50b1992a5a0dff9814b19370
Author: Nick Allen <[email protected]>
Date: 2017-02-09T20:07:40Z
METRON-701 Fixed dependencies list
commit 5d329e64d89ed747d727a6cc2b8a1489f4db2dde
Author: Nick Allen <[email protected]>
Date: 2017-02-09T20:28:37Z
METRON-701 Fix up comments
----
> Triage Metrics Produced by the Profiler
> ---------------------------------------
>
> Key: METRON-701
> URL: https://issues.apache.org/jira/browse/METRON-701
> Project: Metron
> Issue Type: Improvement
> Reporter: Nick Allen
> Assignee: Nick Allen
>
> h3. Problem
> The motivating example is that I would like to create an alert if the number
> of inbound flows to any host over a 15 minute interval is abnormal.
> The value being interrogated here, the number of inbound flows, is not a
> static value contained within any single telemetry message. This value is
> calculated across multiple messages by the Profiler. The current Threat
> Triage process cannot be used to interrogate values calculated by the
> Profiler.
> h3. Proposed Solution
> I am proposing that we treat the Profiler as a source of telemetry. The
> measurements captured by the Profiler would be enqueued into a Kafka topic.
> We would then treat those Profiler messages like any other telemetry. We
> would parse, enrich, triage, and index those messages.
> This would have the following advantages.
> 1. We would be able to reuse the same threat triage mechanism for values
> calculated by the Profiler.
> 2. We would be able to generate profiles from the profiled data - aka
> meta-profiles anyone?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)