METRON-1791 Add GUID to Messages Produced by Profiler (nickwallen) closes apache/metron#1210
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7e222fa4 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7e222fa4 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7e222fa4 Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 7e222fa47aedc4ab3bafec54590ec0bc73b5f75c Parents: b872fdc Author: nickwallen <n...@nickallen.org> Authored: Thu Oct 4 17:45:32 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Thu Oct 4 17:45:32 2018 -0400 ---------------------------------------------------------------------- .../metron/profiler/storm/KafkaEmitter.java | 31 +++++++++--- .../zookeeper/triage-result/profiler.json | 20 ++++++++ .../metron/profiler/storm/KafkaEmitterTest.java | 4 ++ .../integration/ProfilerIntegrationTest.java | 52 ++++++++++++++------ 4 files changed, 85 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java index af1fbca..adbde1b 100644 --- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java +++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/profiler/storm/KafkaEmitter.java @@ -21,6 +21,7 @@ package org.apache.metron.profiler.storm; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ClassUtils; +import org.apache.metron.common.Constants; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.storm.task.OutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.Map; +import java.util.UUID; /** * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will @@ -42,6 +44,14 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable { protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String PROFILE_FIELD = "profile"; + public static final String ENTITY_FIELD = "entity"; + public static final String PERIOD_ID_FIELD = "period"; + public static final String PERIOD_START_FIELD = "period.start"; + public static final String PERIOD_END_FIELD = "period.end"; + public static final String TIMESTAMP_FIELD = "timestamp"; + public static final String ALERT_FIELD = "is_alert"; + /** * The stream identifier used for this destination; */ @@ -126,14 +136,15 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable { private JSONObject createMessage(ProfileMeasurement measurement) { JSONObject message = new JSONObject(); - message.put("profile", measurement.getDefinition().getProfile()); - message.put("entity", measurement.getEntity()); - message.put("period", measurement.getPeriod().getPeriod()); - message.put("period.start", measurement.getPeriod().getStartTimeMillis()); - message.put("period.end", measurement.getPeriod().getEndTimeMillis()); - message.put("timestamp", System.currentTimeMillis()); - message.put("source.type", sourceType); - message.put("is_alert", "true"); + message.put(PROFILE_FIELD, measurement.getDefinition().getProfile()); + message.put(ENTITY_FIELD, measurement.getEntity()); + message.put(PERIOD_ID_FIELD, measurement.getPeriod().getPeriod()); + message.put(PERIOD_START_FIELD, measurement.getPeriod().getStartTimeMillis()); + message.put(PERIOD_END_FIELD, measurement.getPeriod().getEndTimeMillis()); + message.put(TIMESTAMP_FIELD, System.currentTimeMillis()); + message.put(Constants.SENSOR_TYPE, sourceType); + message.put(ALERT_FIELD, "true"); + message.put(Constants.GUID, UUID.randomUUID().toString()); return message; } @@ -158,6 +169,10 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable { this.streamId = streamId; } + public String getSourceType() { + return sourceType; + } + public void setSourceType(String sourceType) { this.sourceType = sourceType; } http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json new file mode 100644 index 0000000..7d63da7 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/triage-result/profiler.json @@ -0,0 +1,20 @@ +{ + "profiles": [ + { + "profile": "profile-with-triage", + "foreach": "'global'", + "update": { + "stats": "STATS_ADD(stats, 1)" + }, + "result": { + "profile": "stats", + "triage": { + "min": "STATS_MIN(stats)", + "max": "STATS_MAX(stats)", + "mean": "STATS_MEAN(stats)" + } + } + } + ], + "timestampField": "timestamp" +} http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java index 51ca3a4..86849ac 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java @@ -22,6 +22,7 @@ package org.apache.metron.profiler.storm; import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.profiler.ProfileMeasurement; @@ -148,6 +149,7 @@ public class KafkaEmitterTest { assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); assertEquals("profiler", actual.get("source.type")); assertNotNull(actual.get("timestamp")); + assertNotNull(actual.get(Constants.GUID)); // validate that the triage value has been added assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); @@ -214,6 +216,8 @@ public class KafkaEmitterTest { assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); assertEquals("profiler", actual.get("source.type")); + assertNotNull(actual.get("timestamp")); + assertNotNull(actual.get(Constants.GUID)); // the invalid expression should be skipped and not included in the message assertFalse(actual.containsKey("invalid")); http://git-wip-us.apache.org/repos/asf/metron/blob/7e222fa4/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java index 4389d42..64a1482 100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java @@ -40,6 +40,8 @@ import org.apache.metron.stellar.common.StellarStatefulExecutor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.apache.storm.Config; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -49,7 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.UnsupportedEncodingException; import java.lang.invoke.MethodHandles; import java.util.Arrays; import java.util.Collections; @@ -68,6 +69,13 @@ import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PRO import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS; import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR; +import static org.apache.metron.profiler.storm.KafkaEmitter.ALERT_FIELD; +import static org.apache.metron.profiler.storm.KafkaEmitter.ENTITY_FIELD; +import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_END_FIELD; +import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_ID_FIELD; +import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_START_FIELD; +import static org.apache.metron.profiler.storm.KafkaEmitter.PROFILE_FIELD; +import static org.apache.metron.profiler.storm.KafkaEmitter.TIMESTAMP_FIELD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -285,18 +293,35 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { assertTrue(results.get(0) instanceof OnlineStatisticsProvider); } - /** - * Generates an error message for if the byte comparison fails. - * - * @param expected The expected value. - * @param actual The actual value. - * @return - * @throws UnsupportedEncodingException - */ - private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException { - return String.format("expected '%s', got '%s'", - new String(expected, "UTF-8"), - new String(actual, "UTF-8")); + @Test + public void testProfileWithTriageResult() throws Exception { + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + List<String> telemetry = FileUtils.readLines(new File("src/test/resources/telemetry.json")); + kafkaComponent.writeMessages(inputTopic, telemetry); + + // wait until the triage message is output to kafka + waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90))); + + List<byte[]> outputMessages = kafkaComponent.readMessages(outputTopic); + assertEquals(1, outputMessages.size()); + + // validate the triage message + JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8")); + assertEquals("profile-with-triage", message.get(PROFILE_FIELD)); + assertEquals("global", message.get(ENTITY_FIELD)); + assertEquals(76548935L, message.get(PERIOD_ID_FIELD)); + assertEquals(1530978700000L, message.get(PERIOD_START_FIELD)); + assertEquals(1530978720000L, message.get(PERIOD_END_FIELD)); + assertEquals("profiler", message.get(Constants.SENSOR_TYPE)); + assertEquals("true", message.get(ALERT_FIELD)); + assertEquals(1.0, message.get("min")); + assertEquals(1.0, message.get("max")); + assertEquals(1.0, message.get("mean")); + assertTrue(message.containsKey(TIMESTAMP_FIELD)); + assertTrue(message.containsKey(Constants.GUID)); } private static String getMessage(String ipSource, long timestamp) { @@ -471,7 +496,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { */ private <T> T execute(String expression, Class<T> clazz) { T results = executor.execute(expression, Collections.emptyMap(), clazz); - LOG.debug("{} = {}", expression, results); return results; }