[ 
https://issues.apache.org/jira/browse/METRON-701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15888392#comment-15888392
 ] 

ASF GitHub Bot commented on METRON-701:
---------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103496145
  
    --- Diff: 
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
 ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, 
Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector 
collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", 
measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    --- End diff --
    
    @cestella said...
    
    >     {
    >       "profile": "test",
    >       "foreach": "'global'",
    >       "onlyif": "source.type == 'bro'",
    >       "init":    { "count": "0" },
    >       "update":  { "count": "count + 1" },
    >       "result": {
    >           "profile": "count",
    >           "triage": "{ 'blah' : count, 'zork' : 'zork'}"
    >       }
    > 
    > Will I get messages in kafka that look like:
    > 
    > ```
    > {
    > "period.start":1488233820000
    > ,"period":24803897
    > ,"profile":"test"
    > ,"blah":161
    > ,"zork":"zork"
    > ,"period.end":1488233880000
    > ,"is_alert":"true"
    > ,"entity":"global"
    > ,"timestamp":1488233841600
    > ,"source.type":"profiler"
    > }
    > ```
    
    Yes, that is exactly what you would get.  The keys 'blah' and 'zork' would 
be added to the message with the result of evaluating the associated triage 
expression.


> Triage Metrics Produced by the Profiler
> ---------------------------------------
>
>                 Key: METRON-701
>                 URL: https://issues.apache.org/jira/browse/METRON-701
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Nick Allen
>            Assignee: Nick Allen
>
> h3. Problem
> The motivating example is that I would like to create an alert if the number 
> of inbound flows to any host over a 15 minute interval is abnormal.  
> The value being interrogated here, the number of inbound flows, is not a 
> static value contained within any single telemetry message.  This value is 
> calculated across multiple messages by the Profiler.  The current Threat 
> Triage process cannot be used to interrogate values calculated by the 
> Profiler.
> h3. Proposed Solution
> I am proposing that we treat the Profiler as a source of telemetry.   The 
> measurements captured by the Profiler would be enqueued into a Kafka topic.  
> We would then treat those Profiler messages like any other telemetry.  We 
> would parse, enrich, triage, and index those messages.
> This would have the following advantages.
> 1.  We would be able to reuse the same threat triage mechanism for values 
> calculated by the Profiler.
> 2.  We would be able to generate profiles from the profiled data - aka 
> meta-profiles anyone? 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to