Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103493583
  
    --- Diff: 
metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
 ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, 
Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector 
collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", 
measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    +
    +      } else {
    +        throw new IllegalArgumentException(String.format("invalid type for 
triage value: profile=%s, entity=%s, key=%s, value=%s",
    --- End diff --
    
    @cestella asked...
    
    > Also, if someone tries to submit something that JSON can't handle (like a 
stats object), will it get dropped or will an exception occur?
    
    Good question.  The type check for the `triage` block occurs here in 
`KafkaDestinationHandler`.  This is called by `ProfileBuilderBolt.handleTick` 
when a tick tuple is received.  The `KafkaDestinationHandler` throws an 
exception which is then caught and logged by the bolt.
    
    I was originally thinking this was OK, but after looking over this again, 
an exception here would impact any of the Profile-Entity pairs being managed by 
the same `ProfileBuilderBolt` instance.  I was originally thinking that only 
the specific problematic profile would be impacted, but this is not the case.  
    
    I think I need to change this to log the error and move on to the next 
triage expression.  Of course, as a user I'd like to know explicitly if I make 
this mistake.  Hmm... I'll have to noodle on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to