[
https://issues.apache.org/jira/browse/METRON-701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888392#comment-15888392
]
ASF GitHub Bot commented on METRON-701:
---------------------------------------
Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/449#discussion_r103496145
--- Diff:
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+
+/**
+ * Handles emitting a ProfileMeasurement to the stream which writes
+ * profile measurements to Kafka.
+ */
+public class KafkaDestinationHandler implements DestinationHandler,
Serializable {
+
+ /**
+ * The stream identifier used for this destination;
+ */
+ private String streamId = "kafka";
+
+ /**
+ * The 'source.type' of messages originating from the Profiler.
+ */
+ private String sourceType = "profiler";
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // the kafka writer expects a field named 'message'
+ declarer.declareStream(getStreamId(), new Fields("message"));
+ }
+
+ @Override
+ public void emit(ProfileMeasurement measurement, OutputCollector
collector) {
+
+ JSONObject message = new JSONObject();
+ message.put("profile", measurement.getDefinition().getProfile());
+ message.put("entity", measurement.getEntity());
+ message.put("period", measurement.getPeriod().getPeriod());
+ message.put("period.start",
measurement.getPeriod().getStartTimeMillis());
+ message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+ message.put("timestamp", System.currentTimeMillis());
+ message.put("source.type", sourceType);
+ message.put("is_alert", "true");
+
+ // append each of the triage values to the message
+ measurement.getTriageValues().forEach((key, value) -> {
+
+ if(isValidType(value)) {
+ message.put(key, value);
--- End diff --
@cestella said...
> {
> "profile": "test",
> "foreach": "'global'",
> "onlyif": "source.type == 'bro'",
> "init": { "count": "0" },
> "update": { "count": "count + 1" },
> "result": {
> "profile": "count",
> "triage": "{ 'blah' : count, 'zork' : 'zork'}"
> }
>
> Will I get messages in kafka that look like:
>
> ```
> {
> "period.start":1488233820000
> ,"period":24803897
> ,"profile":"test"
> ,"blah":161
> ,"zork":"zork"
> ,"period.end":1488233880000
> ,"is_alert":"true"
> ,"entity":"global"
> ,"timestamp":1488233841600
> ,"source.type":"profiler"
> }
> ```
Yes, that is exactly what you would get. The keys 'blah' and 'zork' would
be added to the message with the result of evaluating the associated triage
expression.
> Triage Metrics Produced by the Profiler
> ---------------------------------------
>
> Key: METRON-701
> URL: https://issues.apache.org/jira/browse/METRON-701
> Project: Metron
> Issue Type: Improvement
> Reporter: Nick Allen
> Assignee: Nick Allen
>
> h3. Problem
> The motivating example is that I would like to create an alert if the number
> of inbound flows to any host over a 15 minute interval is abnormal.
> The value being interrogated here, the number of inbound flows, is not a
> static value contained within any single telemetry message. This value is
> calculated across multiple messages by the Profiler. The current Threat
> Triage process cannot be used to interrogate values calculated by the
> Profiler.
> h3. Proposed Solution
> I am proposing that we treat the Profiler as a source of telemetry. The
> measurements captured by the Profiler would be enqueued into a Kafka topic.
> We would then treat those Profiler messages like any other telemetry. We
> would parse, enrich, triage, and index those messages.
> This would have the following advantages.
> 1. We would be able to reuse the same threat triage mechanism for values
> calculated by the Profiler.
> 2. We would be able to generate profiles from the profiled data - aka
> meta-profiles anyone?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)