Github user nickwallen commented on a diff in the pull request: https://github.com/apache/incubator-metron/pull/449#discussion_r103493583 --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.bolt; + +import org.apache.commons.lang3.ClassUtils; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; + +import java.io.Serializable; + +/** + * Handles emitting a ProfileMeasurement to the stream which writes + * profile measurements to Kafka. + */ +public class KafkaDestinationHandler implements DestinationHandler, Serializable { + + /** + * The stream identifier used for this destination; + */ + private String streamId = "kafka"; + + /** + * The 'source.type' of messages originating from the Profiler. + */ + private String sourceType = "profiler"; + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // the kafka writer expects a field named 'message' + declarer.declareStream(getStreamId(), new Fields("message")); + } + + @Override + public void emit(ProfileMeasurement measurement, OutputCollector collector) { + + JSONObject message = new JSONObject(); + message.put("profile", measurement.getDefinition().getProfile()); + message.put("entity", measurement.getEntity()); + message.put("period", measurement.getPeriod().getPeriod()); + message.put("period.start", measurement.getPeriod().getStartTimeMillis()); + message.put("period.end", measurement.getPeriod().getEndTimeMillis()); + message.put("timestamp", System.currentTimeMillis()); + message.put("source.type", sourceType); + message.put("is_alert", "true"); + + // append each of the triage values to the message + measurement.getTriageValues().forEach((key, value) -> { + + if(isValidType(value)) { + message.put(key, value); + + } else { + throw new IllegalArgumentException(String.format("invalid type for triage value: profile=%s, entity=%s, key=%s, value=%s", --- End diff -- @cestella asked... > Also, if someone tries to submit something that JSON can't handle (like a stats object), will it get dropped or will an exception occur? Good question. The type check for the `triage` block occurs here in `KafkaDestinationHandler`. This is called by `ProfileBuilderBolt.handleTick` when a tick tuple is received. The `KafkaDestinationHandler` throws an exception which is then caught and logged by the bolt. I was originally thinking this was OK, but after looking over this again, an exception here would impact any of the Profile-Entity pairs being managed by the same `ProfileBuilderBolt` instance. I was originally thinking that only the specific problematic profile would be impacted, but this is not the case. I think I need to change this to log the error and move on to the next triage expression. Of course, as a user I'd like to know explicitly if I make this mistake. Hmm... I'll have to noodle on this.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---