Repository: metron Updated Branches: refs/heads/master 438893b78 -> 62d1a1bf7
METRON-1494 Profiler Emits Messages to Kafka When Not Needed (nickwallen) closes apache/metron#967 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/62d1a1bf Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/62d1a1bf Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/62d1a1bf Branch: refs/heads/master Commit: 62d1a1bf7e8b9b3ee2f260c358719ea5080c9045 Parents: 438893b Author: nickwallen <n...@nickallen.org> Authored: Wed Apr 11 17:57:09 2018 -0400 Committer: nickallen <nickal...@apache.org> Committed: Wed Apr 11 17:57:09 2018 -0400 ---------------------------------------------------------------------- .../metron/profiler/DefaultProfileBuilder.java | 5 + .../bolt/FixedFrequencyFlushSignal.java | 13 +- .../metron/profiler/bolt/HBaseEmitter.java | 12 +- .../metron/profiler/bolt/KafkaEmitter.java | 78 +++++-- .../profiler/bolt/ProfileSplitterBolt.java | 5 + .../metron/profiler/bolt/HBaseEmitterTest.java | 120 +++++++++++ .../metron/profiler/bolt/KafkaEmitterTest.java | 201 +++++++++++++------ 7 files changed, 358 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java index 4b564c9..66034ac 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java @@ -124,8 +124,13 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { */ @Override public void apply(JSONObject message, long timestamp) { + LOG.debug("Applying message to profile; profile={}, entity={}, timestamp={}", + profileName, entity, timestamp); + try { if (!isInitialized()) { + LOG.debug("Initializing profile; profile={}, entity={}, timestamp={}", + profileName, entity, timestamp); // execute each 'init' expression assign(definition.getInit(), message, "init"); http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java index b9f57dd..8c0a0b1 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignal.java @@ -94,7 +94,8 @@ public class FixedFrequencyFlushSignal implements FlushSignal { // set the next time to flush flushTime = currentTime + flushFrequency; - LOG.debug("Setting flush time; flushTime={}, currentTime={}, flushFreq={}", + LOG.debug("Setting flush time; '{}' ms until flush; flushTime={}, currentTime={}, flushFreq={}", + timeToNextFlush(), flushTime, currentTime, flushFrequency); @@ -112,7 +113,7 @@ public class FixedFrequencyFlushSignal implements FlushSignal { boolean flush = currentTime > flushTime; LOG.debug("Flush={}, '{}' ms until flush; currentTime={}, flushTime={}", flush, - flush ? 0 : (flushTime-currentTime), + timeToNextFlush(), currentTime, flushTime); @@ -123,4 +124,12 @@ public class FixedFrequencyFlushSignal implements FlushSignal { public long currentTimeMillis() { return currentTime; } + + /** + * Returns the number of milliseconds to the next flush. + * @return The time left until the next flush. + */ + private long timeToNextFlush() { + return Math.max(0, flushTime - currentTime); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java index 8e1229a..e4e3552 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseEmitter.java @@ -40,7 +40,7 @@ public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable { /** * The stream identifier used for this destination; */ - private String streamId = "hbase"; + private String streamId = "hbase"; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { @@ -49,7 +49,17 @@ public class HBaseEmitter implements ProfileMeasurementEmitter, Serializable { @Override public void emit(ProfileMeasurement measurement, OutputCollector collector) { + + // measurements are always emitted to hbase collector.emit(getStreamId(), new Values(measurement)); + + LOG.debug("Emitted measurement; stream={}, profile={}, entity={}, period={}, start={}, end={}", + getStreamId(), + measurement.getProfileName(), + measurement.getEntity(), + measurement.getPeriod().getPeriod(), + measurement.getPeriod().getStartTimeMillis(), + measurement.getPeriod().getEndTimeMillis()); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java index 29d1a49..87920da 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaEmitter.java @@ -19,8 +19,7 @@ package org.apache.metron.profiler.bolt; -import java.io.Serializable; -import java.lang.invoke.MethodHandles; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.metron.profiler.ProfileMeasurement; import org.apache.storm.task.OutputCollector; @@ -31,6 +30,10 @@ import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.Map; + /** * Responsible for emitting a {@link ProfileMeasurement} to an output stream that will * persist data in HBase. @@ -58,19 +61,48 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable { @Override public void emit(ProfileMeasurement measurement, OutputCollector collector) { - JSONObject message = new JSONObject(); - message.put("profile", measurement.getDefinition().getProfile()); - message.put("entity", measurement.getEntity()); - message.put("period", measurement.getPeriod().getPeriod()); - message.put("period.start", measurement.getPeriod().getStartTimeMillis()); - message.put("period.end", measurement.getPeriod().getEndTimeMillis()); - message.put("timestamp", System.currentTimeMillis()); - message.put("source.type", sourceType); - message.put("is_alert", "true"); + // only need to emit, if there are triage values + Map<String, Object> triageValues = measurement.getTriageValues(); + if(MapUtils.isNotEmpty(triageValues)) { + + JSONObject message = createMessage(measurement); + appendTriageValues(measurement, message); + collector.emit(getStreamId(), new Values(message)); + + LOG.debug("Emitted measurement; stream={}, profile={}, entity={}, period={}, start={}, end={}", + getStreamId(), + measurement.getProfileName(), + measurement.getEntity(), + measurement.getPeriod().getPeriod(), + measurement.getPeriod().getStartTimeMillis(), + measurement.getPeriod().getEndTimeMillis()); + + } else { + + LOG.debug("No triage values, nothing to emit; stream={}, profile={}, entity={}, period={}, start={}, end={}", + getStreamId(), + measurement.getProfileName(), + measurement.getEntity(), + measurement.getPeriod().getPeriod(), + measurement.getPeriod().getStartTimeMillis(), + measurement.getPeriod().getEndTimeMillis()); + } + } - // append each of the triage values to the message - measurement.getTriageValues().forEach((key, value) -> { + /** + * Appends triage values obtained from a {@code ProfileMeasurement} to the + * outgoing message. + * + * @param measurement The measurement that may contain triage values. + * @param message The message that the triage values are appended to. + */ + private void appendTriageValues(ProfileMeasurement measurement, JSONObject message) { + + // for each triage value... + Map<String, Object> triageValues = MapUtils.emptyIfNull(measurement.getTriageValues()); + triageValues.forEach((key, value) -> { + // append the triage value to the message if(isValidType(value)) { message.put(key, value); @@ -83,8 +115,26 @@ public class KafkaEmitter implements ProfileMeasurementEmitter, Serializable { key)); } }); + } + + /** + * Creates a message that will be emitted to Kafka. + * + * @param measurement The profile measurement used as a basis for the message. + * @return A message that can be emitted to Kafka. + */ + private JSONObject createMessage(ProfileMeasurement measurement) { - collector.emit(getStreamId(), new Values(message)); + JSONObject message = new JSONObject(); + message.put("profile", measurement.getDefinition().getProfile()); + message.put("entity", measurement.getEntity()); + message.put("period", measurement.getPeriod().getPeriod()); + message.put("period.start", measurement.getPeriod().getStartTimeMillis()); + message.put("period.end", measurement.getPeriod().getEndTimeMillis()); + message.put("timestamp", System.currentTimeMillis()); + message.put("source.type", sourceType); + message.put("is_alert", "true"); + return message; } /** http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java index a92a432..f28411f 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java @@ -190,6 +190,11 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt { Values values = createValues(message, timestamp, route); collector.emit(input, values); + + LOG.debug("Found route for message; profile={}, entity={}, timestamp={}", + route.getProfileDefinition().getProfile(), + route.getEntity(), + timestamp); } LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp); http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java new file mode 100644 index 0000000..35ca4d9 --- /dev/null +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.bolt; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests the HBaseEmitter class. + */ +public class HBaseEmitterTest { + + /** + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + */ + @Multiline + private String profileDefinition; + + private HBaseEmitter emitter; + private ProfileConfig profile; + private OutputCollector collector; + + @Before + public void setup() throws Exception { + emitter = new HBaseEmitter(); + profile = createDefinition(profileDefinition); + collector = Mockito.mock(OutputCollector.class); + } + + /** + * The handler should emit a message containing the result of executing + * the 'result/profile' expression. + */ + @Test + public void testEmit() throws Exception { + + // create a measurement that has triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withDefinition(profile) + .withProfileValue(22); + + // execute the test + emitter.emit(measurement, collector); + + // the measurement should be emitted as-is + ProfileMeasurement actual = expectMeasurement(emitter, collector); + assertEquals(measurement, actual); + } + + /** + * Verifies that the emitter does emit a {@code ProfileMeasurement}. + * + * @return The {@code ProfileMeasurement} that was emitted + */ + private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, OutputCollector collector) { + + ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof ProfileMeasurement); + return (ProfileMeasurement) values.get(0); + } + + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfileConfig createDefinition(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/62d1a1bf/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java index b02e377..95a2d29 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java @@ -43,6 +43,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -58,54 +59,128 @@ public class KafkaEmitterTest { * "foreach": "ip_src_addr", * "init": { "x": "0" }, * "update": { "x": "x + 1" }, - * "result": "x" + * "result": { + * "profile": "x", + * "triage": { + * "value": "x" + * } + * } * } */ @Multiline - private String profileDefinition; + private String profileDefinitionWithTriage; - private KafkaEmitter handler; + private KafkaEmitter kafkaEmitter; private ProfileConfig profile; private OutputCollector collector; @Before public void setup() throws Exception { - handler = new KafkaEmitter(); - profile = createDefinition(profileDefinition); + kafkaEmitter = new KafkaEmitter(); + profile = createDefinition(profileDefinitionWithTriage); collector = Mockito.mock(OutputCollector.class); } /** - * The handler must serialize the ProfileMeasurement into a JSONObject. + * The handler should emit a message when a result/triage expression(s) has been defined. */ @Test - public void testSerialization() throws Exception { + public void testEmit() throws Exception { + // create a measurement that has triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withDefinition(profile) + .withTriageValues(Collections.singletonMap("triage-key", "triage-value")); + + // execute the test + kafkaEmitter.emit(measurement, collector); + + // a message should be emitted + verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any()); + } + + /** + * The handler should NOT emit a message when there is NO result/triage value(s). + */ + @Test + public void testDoNotEmit() throws Exception { + + // create a measurement with NO triage values ProfileMeasurement measurement = new ProfileMeasurement() .withProfileName("profile") .withEntity("entity") .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", "triage-value")) .withDefinition(profile); - handler.emit(measurement, collector); - ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); + // execute the test + kafkaEmitter.emit(measurement, collector); - // expect a JSONObject - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); + // a message should NOT be emitted + verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any()); + } - // validate the json - JSONObject actual = (JSONObject) values.get(0); - assertEquals(measurement.getDefinition().getProfile(), actual.get("profile")); - assertEquals(measurement.getEntity(), actual.get("entity")); - assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); - assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); - assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + /** + * Validate that the message generated for Kafka should include the triage value. + */ + @Test + public void testTriageValueInMessage() throws Exception { + + // create a measurement that has triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", "triage-value")); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // validate the core parts of the message + assertEquals(measurement.getProfileName(), actual.get("profile")); + assertEquals(measurement.getEntity(), actual.get("entity")); + assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); + assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); + assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); + assertEquals("profiler", actual.get("source.type")); assertNotNull(actual.get("timestamp")); - assertEquals("profiler", actual.get("source.type")); + + // validate that the triage value has been added + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + } + + /** + * Validate that the message generated for Kafka can include multiple triage values. + */ + @Test + public void testMultipleTriageValueInMessage() throws Exception { + + // multiple triage values have been defined + Map<String, Object> triageValues = ImmutableMap.of( + "x", 2, + "y", "4", + "z", 6.0); + + // create a measurement that has multiple triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(triageValues); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // validate that ALL of the triage values have been added + assertEquals(measurement.getTriageValues().get("x"), actual.get("x")); + assertEquals(measurement.getTriageValues().get("y"), actual.get("y")); + assertEquals(measurement.getTriageValues().get("z"), actual.get("z")); } /** @@ -120,30 +195,27 @@ public class KafkaEmitterTest { "invalid", new OnlineStatisticsProvider(), "valid", 4); + // create the measurement with a Map as a triage value; this is not allowed ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") + .withDefinition(profile) + .withProfileName(profile.getProfile()) .withEntity("entity") .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(triageValues) - .withDefinition(profile); - handler.emit(measurement, collector); + .withTriageValues(triageValues); - ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); - // only the triage expression value itself should have been skipped, all others should be there - JSONObject actual = (JSONObject) values.get(0); - assertEquals(measurement.getDefinition().getProfile(), actual.get("profile")); - assertEquals(measurement.getEntity(), actual.get("entity")); - assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); - assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); - assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); - assertNotNull(actual.get("timestamp")); - assertEquals("profiler", actual.get("source.type")); + // validate the core parts of the message still exist + assertEquals(measurement.getProfileName(), actual.get("profile")); + assertEquals(measurement.getEntity(), actual.get("entity")); + assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); + assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); + assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); + assertEquals("profiler", actual.get("source.type")); - // the invalid expression should be skipped due to invalid type + // the invalid expression should be skipped and not included in the message assertFalse(actual.containsKey("invalid")); // but the valid expression should still be there @@ -156,19 +228,18 @@ public class KafkaEmitterTest { */ @Test public void testIntegerIsValidType() throws Exception { + + // create a measurement with a triage value that is an integer ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") + .withDefinition(profile) + .withProfileName(profile.getProfile()) .withEntity("entity") .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", 123)) - .withDefinition(profile); - handler.emit(measurement, collector); + .withTriageValues(Collections.singletonMap("triage-key", 123)); - ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - JSONObject actual = (JSONObject) values.get(0); + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); // the triage expression is valid assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); @@ -180,25 +251,37 @@ public class KafkaEmitterTest { */ @Test public void testStringIsValidType() throws Exception { + + // create a measurement with a triage value that is a string ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") + .withDefinition(profile) + .withProfileName(profile.getProfile()) .withEntity("entity") .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", "value")) - .withDefinition(profile); - handler.emit(measurement, collector); + .withTriageValues(Collections.singletonMap("triage-key", "value")); - ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - JSONObject actual = (JSONObject) values.get(0); + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); // the triage expression is valid assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); } /** + * Verifies that the KafkaEmitter does emit a JSONObject. + * @return The JSONObject that was emitted + */ + private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, OutputCollector collector) { + + ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof JSONObject); + return (JSONObject) values.get(0); + } + + /** * Creates a profile definition based on a string of JSON. * @param json The string of JSON. */