http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java index fc94afa,0000000..3f16edd mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java @@@ -1,356 -1,0 +1,378 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.profiler.MessageDistributor; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.storm.integration.MessageBuilder; +import org.apache.metron.test.bolt.BaseBoltTest; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; ++import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests the ProfileBuilderBolt. + */ +public class ProfileBuilderBoltTest extends BaseBoltTest { + + private JSONObject message1; + private JSONObject message2; + private ProfileConfig profile1; + private ProfileConfig profile2; + private ProfileMeasurementEmitter emitter; + private ManualFlushSignal flushSignal; + private ProfileMeasurement measurement; + + @Before + public void setup() throws Exception { + + message1 = new MessageBuilder() + .withField("ip_src_addr", "10.0.0.1") + .withField("value", "22") + .build(); + + message2 = new MessageBuilder() + .withField("ip_src_addr", "10.0.0.2") + .withField("value", "22") + .build(); + + profile1 = new ProfileConfig() + .withProfile("profile1") + .withForeach("ip_src_addr") + .withInit("x", "0") + .withUpdate("x", "x + 1") + .withResult("x"); + + profile2 = new ProfileConfig() + .withProfile("profile2") + .withForeach("ip_src_addr") + .withInit(Collections.singletonMap("x", "0")) + .withUpdate(Collections.singletonMap("x", "x + 1")) + .withResult("x"); + + measurement = new ProfileMeasurement() + .withEntity("entity1") + .withProfileName("profile1") + .withPeriod(1000, 500, TimeUnit.MILLISECONDS) + .withProfileValue(22); + + flushSignal = new ManualFlushSignal(); + flushSignal.setFlushNow(false); + } + + /** + * The bolt should extract a message and timestamp from a tuple and + * pass that to a {@code MessageDistributor}. + */ + @Test + public void testExtractMessage() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock + MessageDistributor distributor = mock(MessageDistributor.class); + bolt.withMessageDistributor(distributor); + + // create a tuple + final long timestamp1 = 100000000L; + Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1); + + // execute the bolt + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // the message should have been extracted from the tuple and passed to the MessageDistributor + verify(distributor).distribute(any(MessageRoute.class), any()); + } + + + /** + * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor} + * and emit the {@code ProfileMeasurement} values from all active profiles. + */ + @Test + public void testFlushActiveProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock that returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // signal the bolt to flush + flushSignal.setFlushNow(true); + + // execute the bolt + Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // a profile measurement should be emitted by the bolt + List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); + assertEquals(1, measurements.size()); + assertEquals(measurement, measurements.get(0)); + } + + /** + * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted. + */ + @Test + public void testDoNotFlushActiveProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock where flush() returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // there is no flush signal + flushSignal.setFlushNow(false); + + // execute the bolt + Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // nothing should have been emitted + getProfileMeasurements(outputCollector, 0); + } + + /** + * Expired profiles should be flushed regularly, even if no input telemetry + * has been received. + */ + @Test + public void testFlushExpiredProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock where flushExpired() returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // execute test by flushing expired profiles. this is normally triggered by a timer task. + bolt.flushExpired(); + + // a profile measurement should be emitted by the bolt + List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); + assertEquals(1, measurements.size()); + assertEquals(measurement, measurements.get(0)); + } + + /** + * A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each + * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations. + */ + @Test + public void testEmitters() throws Exception { + + // defines the zk configurations accessible from the bolt + ProfilerConfigurations configurations = new ProfilerConfigurations(); + configurations.updateGlobalConfig(Collections.emptyMap()); + + // create the bolt with 3 destinations + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withProfileTimeToLive(30, TimeUnit.MINUTES) + .withPeriodDuration(10, TimeUnit.MINUTES) + .withMaxNumberOfRoutes(Long.MAX_VALUE) + .withZookeeperClient(client) + .withZookeeperCache(cache) + .withEmitter(new TestEmitter("destination1")) + .withEmitter(new TestEmitter("destination2")) + .withEmitter(new TestEmitter("destination3")) + .withProfilerConfigurations(configurations) + .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES)); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + + // signal the bolt to flush + bolt.withFlushSignal(flushSignal); + flushSignal.setFlushNow(true); + + // execute the bolt + Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis()); + TupleWindow window = createWindow(tuple1); + bolt.execute(window); + + // validate measurements emitted to each + verify(outputCollector, times(1)).emit(eq("destination1"), any()); + verify(outputCollector, times(1)).emit(eq("destination2"), any()); + verify(outputCollector, times(1)).emit(eq("destination3"), any()); + } + ++ @Test ++ public void testExceptionWhenFlushingExpiredProfiles() throws Exception { ++ // create an emitter that will throw an exception when emit() called ++ ProfileMeasurementEmitter badEmitter = mock(ProfileMeasurementEmitter.class); ++ doThrow(new RuntimeException("flushExpired() should catch this exception")) ++ .when(badEmitter) ++ .emit(any(), any()); ++ ++ // create a distributor that will return a profile measurement ++ MessageDistributor distributor = mock(MessageDistributor.class); ++ when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement)); ++ ++ // the bolt will use the bad emitter when flushExpired() is called ++ ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() ++ .withEmitter(badEmitter) ++ .withMessageDistributor(distributor); ++ ++ // the exception thrown by the emitter should not bubble up ++ bolt.flushExpired(); ++ } ++ + /** + * Retrieves the ProfileMeasurement(s) (if any) that have been emitted. + * + * @param collector The Storm output collector. + * @param expected The number of measurements expected. + * @return A list of ProfileMeasurement(s). + */ + private List<ProfileMeasurement> getProfileMeasurements(OutputCollector collector, int expected) { + + // the 'streamId' is defined by the DestinationHandler being used by the bolt + final String streamId = emitter.getStreamId(); + + // capture the emitted tuple(s) + ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class); + verify(collector, times(expected)) + .emit(eq(streamId), argCaptor.capture()); + + // return the profile measurements that were emitted + return argCaptor.getAllValues() + .stream() + .map(val -> (ProfileMeasurement) val.get(0)) + .collect(Collectors.toList()); + } + + /** + * Create a tuple that will contain the message, the entity name, and profile definition. + * @param entity The entity name + * @param message The telemetry message. + * @param profile The profile definition. + */ + private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) { + + Tuple tuple = mock(Tuple.class); + when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message); + when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp); + when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity); + when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile); + + return tuple; + } + + /** + * Create a ProfileBuilderBolt to test. + * @return A {@link ProfileBuilderBolt} to test. + */ + private ProfileBuilderBolt createBolt() throws IOException { + + // defines the zk configurations accessible from the bolt + ProfilerConfigurations configurations = new ProfilerConfigurations(); + configurations.updateGlobalConfig(Collections.emptyMap()); + + emitter = new HBaseEmitter(); + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withProfileTimeToLive(30, TimeUnit.MINUTES) + .withMaxNumberOfRoutes(Long.MAX_VALUE) + .withZookeeperClient(client) + .withZookeeperCache(cache) + .withEmitter(emitter) + .withProfilerConfigurations(configurations) + .withPeriodDuration(1, TimeUnit.MINUTES) + .withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + + // set the flush signal AFTER calling 'prepare' + bolt.withFlushSignal(flushSignal); + + return bolt; + } + + /** + * Creates a mock TupleWindow containing multiple tuples. + * @param tuples The tuples to add to the window. + */ + private TupleWindow createWindow(Tuple... tuples) { + + TupleWindow window = mock(TupleWindow.class); + when(window.get()).thenReturn(Arrays.asList(tuples)); + return window; + } + + /** + * An implementation for testing purposes only. + */ + private class TestEmitter implements ProfileMeasurementEmitter { + + private String streamId; + + public TestEmitter(String streamId) { + this.streamId = streamId; + } + + @Override + public String getStreamId() { + return streamId; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(getStreamId(), new Fields("measurement")); + } + + @Override + public void emit(ProfileMeasurement measurement, OutputCollector collector) { + collector.emit(getStreamId(), new Values(measurement)); + } + } +}
http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java index 182600a,0000000..4389d42 mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java @@@ -1,421 -1,0 +1,478 @@@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm.integration; + +import org.adrianwalker.multilinestring.Multiline; - import org.apache.hadoop.hbase.Cell; - import org.apache.hadoop.hbase.client.Put; - import org.apache.hadoop.hbase.util.Bytes; ++import org.apache.commons.io.FileUtils; +import org.apache.metron.common.Constants; - import org.apache.metron.common.utils.SerDeUtils; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.integration.BaseIntegrationTest; +import org.apache.metron.integration.ComponentRunner; +import org.apache.metron.integration.UnableToStartException; +import org.apache.metron.integration.components.FluxTopologyComponent; +import org.apache.metron.integration.components.KafkaComponent; +import org.apache.metron.integration.components.ZKServerComponent; - import org.apache.metron.profiler.ProfileMeasurement; - import org.apache.metron.profiler.hbase.ColumnBuilder; - import org.apache.metron.profiler.hbase.RowKeyBuilder; - import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; - import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; ++import org.apache.metron.profiler.client.stellar.FixedLookback; ++import org.apache.metron.profiler.client.stellar.GetProfile; ++import org.apache.metron.profiler.client.stellar.WindowLookback; ++import org.apache.metron.statistics.OnlineStatisticsProvider; ++import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; ++import org.apache.metron.stellar.common.StellarStatefulExecutor; ++import org.apache.metron.stellar.dsl.Context; ++import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; +import org.apache.storm.Config; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.UnsupportedEncodingException; - import java.util.ArrayList; ++import java.lang.invoke.MethodHandles; +import java.util.Arrays; ++import java.util.Collections; ++import java.util.HashMap; +import java.util.List; ++import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static com.google.code.tempusfugit.temporal.Duration.seconds; +import static com.google.code.tempusfugit.temporal.Timeout.timeout; +import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; - import static org.junit.Assert.assertArrayEquals; ++import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY; ++import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE; ++import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER; ++import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD; ++import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS; ++import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * An integration test of the Profiler topology. + */ +public class ProfilerIntegrationTest extends BaseIntegrationTest { + ++ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler-storm/src/test"; + private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml"; + + public static final long startAt = 10; + public static final String entity = "10.0.0.1"; + + private static final String tableName = "profiler"; + private static final String columnFamily = "P"; + private static final String inputTopic = Constants.INDEXING_TOPIC; + private static final String outputTopic = "profiles"; + private static final int saltDivisor = 10; + - private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1); - private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5); - private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10); - private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15); ++ private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(20); ++ private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10); ++ private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(10); ++ private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20); ++ + private static final long maxRoutesPerBolt = 100000; + - private static ColumnBuilder columnBuilder; + private static ZKServerComponent zkComponent; + private static FluxTopologyComponent fluxComponent; + private static KafkaComponent kafkaComponent; + private static ConfigUploadComponent configUploadComponent; + private static ComponentRunner runner; + private static MockHTable profilerTable; + + private static String message1; + private static String message2; + private static String message3; + ++ private StellarStatefulExecutor executor; ++ + /** + * [ + * org.apache.metron.profiler.ProfileMeasurement, + * org.apache.metron.profiler.ProfilePeriod, + * org.apache.metron.common.configuration.profiler.ProfileResult, + * org.apache.metron.common.configuration.profiler.ProfileResultExpressions, + * org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, + * org.apache.metron.common.configuration.profiler.ProfilerConfig, + * org.apache.metron.common.configuration.profiler.ProfileConfig, + * org.json.simple.JSONObject, ++ * org.json.simple.JSONArray, + * java.util.LinkedHashMap, + * org.apache.metron.statistics.OnlineStatisticsProvider + * ] + */ + @Multiline + private static String kryoSerializers; + - /** - * The Profiler can generate profiles based on processing time. With processing time, - * the Profiler builds profiles based on when the telemetry was processed. - * - * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler - * to use processing time. - */ + @Test + public void testProcessingTime() throws Exception { ++ uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + - // upload the config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - - // start the topology and write test messages to kafka ++ // start the topology and write 3 test messages to kafka + fluxComponent.submitTopology(); - - // the messages that will be applied to the profile + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); + - // storm needs at least one message to close its event window ++ // retrieve the profile measurement using PROFILE_GET ++ String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; ++ List<Integer> measurements = execute(profileGetExpression, List.class); ++ ++ // need to keep checking for measurements until the profiler has flushed one out + int attempt = 0; - while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { ++ while(measurements.size() == 0 && attempt++ < 10) { + - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); ++ // wait for the profiler to flush ++ long sleep = windowDurationMillis; ++ LOG.debug("Waiting {} millis for profiler to flush", sleep); ++ Thread.sleep(sleep); + - // send another message to help close the current event window ++ // write another message to advance time. this ensures we are testing the 'normal' flush mechanism. ++ // if we do not send additional messages to advance time, then it is the profile TTL mechanism which ++ // will ultimately flush the profile + kafkaComponent.writeMessages(inputTopic, message2); ++ ++ // try again to retrieve the profile measurement using PROFILE_GET ++ measurements = execute(profileGetExpression, List.class); + } + - // validate what was flushed - List<Integer> actuals = read( - profilerTable.getPutLog(), - columnFamily, - columnBuilder.getColumnQualifier("value"), - Integer.class); - assertEquals(1, actuals.size()); - assertTrue(actuals.get(0) >= 3); ++ // expect to see only 1 measurement, but could be more (one for each period) depending on ++ // how long we waited for the flush to occur ++ assertTrue(measurements.size() > 0); ++ ++ // the profile should have counted at least 3 messages; the 3 test messages that were sent. ++ // the count could be higher due to the test messages we sent to advance time. ++ assertTrue(measurements.get(0) >= 3); + } + - /** - * The Profiler can generate profiles using event time. With event time processing, - * the Profiler uses timestamps contained in the source telemetry. - * - * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler - * from which field the timestamp should be extracted. - */ + @Test - public void testEventTime() throws Exception { - - // upload the profiler config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test"); ++ public void testProcessingTimeWithTimeToLiveFlush() throws Exception { ++ uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + - // start the topology and write test messages to kafka ++ // start the topology and write 3 test messages to kafka + fluxComponent.submitTopology(); + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); + - // wait until the profile is flushed - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); ++ // wait a bit beyond the window lag before writing another message. this allows storm's window manager to close ++ // the event window, which then lets the profiler processes the previous messages. ++ long sleep = windowLagMillis + periodDurationMillis; ++ LOG.debug("Waiting {} millis before sending message to close window", sleep); ++ Thread.sleep(sleep); ++ kafkaComponent.writeMessages(inputTopic, message3); ++ ++ // retrieve the profile measurement using PROFILE_GET ++ String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; ++ List<Integer> measurements = execute(profileGetExpression, List.class); + - List<Put> puts = profilerTable.getPutLog(); - assertEquals(1, puts.size()); ++ // need to keep checking for measurements until the profiler has flushed one out ++ int attempt = 0; ++ while(measurements.size() == 0 && attempt++ < 10) { ++ ++ // wait for the profiler to flush ++ sleep = windowDurationMillis; ++ LOG.debug("Waiting {} millis for profiler to flush", sleep); ++ Thread.sleep(sleep); ++ ++ // do not write additional messages to advance time. this ensures that we are testing the "time to live" ++ // flush mechanism. the TTL setting defines when the profile will be flushed ++ ++ // try again to retrieve the profile measurement ++ measurements = execute(profileGetExpression, List.class); ++ } ++ ++ // expect to see only 1 measurement, but could be more (one for each period) depending on ++ // how long we waited for the flush to occur ++ assertTrue(measurements.size() > 0); + - // inspect the row key to ensure the profiler used event time correctly. the timestamp - // embedded in the row key should match those in the source telemetry - byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); - byte[] actualRowKey = puts.get(0).getRow(); - assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); ++ // the profile should have counted 3 messages; the 3 test messages that were sent ++ assertEquals(3, measurements.get(0).intValue()); ++ } ++ ++ @Test ++ public void testEventTime() throws Exception { ++ uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/event-time-test"); ++ ++ // start the topology and write test messages to kafka ++ fluxComponent.submitTopology(); ++ List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); ++ kafkaComponent.writeMessages(inputTopic, messages); ++ ++ long timestamp = System.currentTimeMillis(); ++ LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp); ++ kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", timestamp)); ++ kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", timestamp)); ++ ++ // create the 'window' that looks up to 5 hours before the max timestamp contained in the test data ++ assign("maxTimestamp", "1530978728982L"); ++ assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); ++ ++ // wait until the profile flushes both periods. the first period will flush immediately as subsequent messages ++ // advance time. the next period contains all of the remaining messages, so there are no other messages to ++ // advance time. because of this the next period only flushes after the time-to-live expires ++ waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, timeout(seconds(90))); ++ { ++ // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1 ++ List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class); ++ assertEquals(14, results.get(0)); ++ assertEquals(12, results.get(1)); ++ } ++ { ++ // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158 ++ List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class); ++ assertEquals(36, results.get(0)); ++ assertEquals(38, results.get(1)); ++ } ++ { ++ // in all there are 50 messages in the first period and 50 messages in the next ++ List results = execute("PROFILE_GET('total-count', 'total', window)", List.class); ++ assertEquals(50, results.get(0)); ++ assertEquals(50, results.get(1)); ++ } + } + + /** + * The result produced by a Profile has to be serializable within Storm. If the result is not + * serializable the topology will crash and burn. + * + * This test ensures that if a profile returns a STATS object created using the STATS_INIT and + * STATS_ADD functions, that it can be correctly serialized and persisted. + */ + @Test + public void testProfileWithStatsObject() throws Exception { - - // upload the profiler config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); ++ uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1); - kafkaComponent.writeMessages(inputTopic, message2); - kafkaComponent.writeMessages(inputTopic, message3); ++ List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); ++ kafkaComponent.writeMessages(inputTopic, messages); + + // wait until the profile is flushed + waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); + - // ensure that a value was persisted in HBase - List<Put> puts = profilerTable.getPutLog(); - assertEquals(1, puts.size()); - - // generate the expected row key. only the profile name, entity, and period are used to generate the row key - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile-with-stats") - .withEntity("global") - .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS); - RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); - byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement); - - // ensure the correct row key was generated - byte[] actualRowKey = puts.get(0).getRow(); - assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); ++ // validate the measurements written by the batch profiler using `PROFILE_GET` ++ // the 'window' looks up to 5 hours before the max timestamp contained in the test data ++ assign("maxTimestamp", "1530978728982L"); ++ assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); ++ ++ // retrieve the stats stored by the profiler ++ List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class); ++ assertTrue(results.size() > 0); ++ assertTrue(results.get(0) instanceof OnlineStatisticsProvider); + } + + /** + * Generates an error message for if the byte comparison fails. + * + * @param expected The expected value. + * @param actual The actual value. + * @return + * @throws UnsupportedEncodingException + */ + private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException { + return String.format("expected '%s', got '%s'", + new String(expected, "UTF-8"), + new String(actual, "UTF-8")); + } + - /** - * Generates the expected row key. - * - * @param profileName The name of the profile. - * @param entity The entity. - * @param whenMillis A timestamp in epoch milliseconds. - * @return A row key. - */ - private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) { - - // only the profile name, entity, and period are used to generate the row key - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName(profileName) - .withEntity(entity) - .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS); - - // build the row key - RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); - return rowKeyBuilder.rowKey(measurement); - } - - /** - * Reads a value written by the Profiler. - * - * @param family The column family. - * @param qualifier The column qualifier. - * @param clazz The expected type of the value. - * @param <T> The expected type of the value. - * @return The value written by the Profiler. - */ - private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, Class<T> clazz) { - List<T> results = new ArrayList<>(); - - for(Put put: puts) { - List<Cell> cells = put.get(Bytes.toBytes(family), qualifier); - for(Cell cell : cells) { - T value = SerDeUtils.fromBytes(cell.getValue(), clazz); - results.add(value); - } - } - - return results; ++ private static String getMessage(String ipSource, long timestamp) { ++ return new MessageBuilder() ++ .withField("ip_src_addr", ipSource) ++ .withField("timestamp", timestamp) ++ .build() ++ .toJSONString(); + } + + @BeforeClass + public static void setupBeforeClass() throws UnableToStartException { + + // create some messages that contain a timestamp - a really old timestamp; close to 1970 - message1 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt) - .build() - .toJSONString(); - - message2 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + 100) - .build() - .toJSONString(); - - message3 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + (windowDurationMillis * 2)) - .build() - .toJSONString(); - - columnBuilder = new ValueOnlyColumnBuilder(columnFamily); ++ message1 = getMessage(entity, startAt); ++ message2 = getMessage(entity, startAt + 100); ++ message3 = getMessage(entity, startAt + (windowDurationMillis * 2)); + + // storm topology properties + final Properties topologyProperties = new Properties() {{ + + // storm settings + setProperty("profiler.workers", "1"); + setProperty("profiler.executors", "0"); ++ + setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]"); + setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60"); + setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000"); + + // ensure tuples are serialized during the test, otherwise serialization problems + // will not be found until the topology is run on a cluster with multiple workers + setProperty(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, "true"); + setProperty(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, "false"); + setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers); + + // kafka settings + setProperty("profiler.input.topic", inputTopic); + setProperty("profiler.output.topic", outputTopic); - setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); ++ setProperty("kafka.start", "EARLIEST"); + setProperty("kafka.security.protocol", "PLAINTEXT"); + + // hbase settings + setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor)); + setProperty("profiler.hbase.table", tableName); + setProperty("profiler.hbase.column.family", columnFamily); + setProperty("profiler.hbase.batch", "10"); + setProperty("profiler.hbase.flush.interval.seconds", "1"); + setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName()); + + // profile settings + setProperty("profiler.period.duration", Long.toString(periodDurationMillis)); + setProperty("profiler.period.duration.units", "MILLISECONDS"); + setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis)); + setProperty("profiler.ttl.units", "MILLISECONDS"); + setProperty("profiler.window.duration", Long.toString(windowDurationMillis)); + setProperty("profiler.window.duration.units", "MILLISECONDS"); + setProperty("profiler.window.lag", Long.toString(windowLagMillis)); + setProperty("profiler.window.lag.units", "MILLISECONDS"); + setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt)); + }}; + + // create the mock table + profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); + + zkComponent = getZKServerComponent(topologyProperties); + + // create the input and output topics + kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList( + new KafkaComponent.Topic(inputTopic, 1), + new KafkaComponent.Topic(outputTopic, 1))); + + // upload profiler configuration to zookeeper + configUploadComponent = new ConfigUploadComponent() + .withTopologyProperties(topologyProperties); + + // load flux definition for the profiler topology + fluxComponent = new FluxTopologyComponent.Builder() + .withTopologyLocation(new File(FLUX_PATH)) + .withTopologyName("profiler") + .withTopologyProperties(topologyProperties) + .build(); + + // start all components + runner = new ComponentRunner.Builder() + .withComponent("zk",zkComponent) + .withComponent("kafka", kafkaComponent) + .withComponent("config", configUploadComponent) + .withComponent("storm", fluxComponent) + .withMillisecondsBetweenAttempts(15000) + .withNumRetries(10) + .withCustomShutdownOrder(new String[] {"storm","config","kafka","zk"}) + .build(); + runner.start(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + MockHBaseTableProvider.clear(); + if (runner != null) { + runner.stop(); + } + } + + @Before + public void setup() { + // create the mock table + profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); ++ ++ // global properties ++ Map<String, Object> global = new HashMap<String, Object>() {{ ++ put(PROFILER_HBASE_TABLE.getKey(), tableName); ++ put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily); ++ put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName()); ++ ++ // client needs to use the same period duration ++ put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis)); ++ put(PROFILER_PERIOD_UNITS.getKey(), "MILLISECONDS"); ++ ++ // client needs to use the same salt divisor ++ put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor); ++ }}; ++ ++ // create the stellar execution environment ++ executor = new DefaultStellarStatefulExecutor( ++ new SimpleFunctionResolver() ++ .withClass(GetProfile.class) ++ .withClass(FixedLookback.class) ++ .withClass(WindowLookback.class), ++ new Context.Builder() ++ .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) ++ .build()); + } + + @After + public void tearDown() throws Exception { + MockHBaseTableProvider.clear(); + profilerTable.clear(); + if (runner != null) { + runner.reset(); + } + } + + /** + * Uploads config values to Zookeeper. + * @param path The path on the local filesystem to the config values. + * @throws Exception + */ - public void uploadConfig(String path) throws Exception { ++ public void uploadConfigToZookeeper(String path) throws Exception { + configUploadComponent + .withGlobalConfiguration(path) + .withProfilerConfiguration(path) + .update(); + } ++ ++ /** ++ * Assign a value to the result of an expression. ++ * ++ * @param var The variable to assign. ++ * @param expression The expression to execute. ++ */ ++ private void assign(String var, String expression) { ++ executor.assign(var, expression, Collections.emptyMap()); ++ } ++ ++ /** ++ * Execute a Stellar expression. ++ * ++ * @param expression The Stellar expression to execute. ++ * @param clazz ++ * @param <T> ++ * @return The result of executing the Stellar expression. ++ */ ++ private <T> T execute(String expression, Class<T> clazz) { ++ T results = executor.execute(expression, Collections.emptyMap(), clazz); ++ ++ LOG.debug("{} = {}", expression, results); ++ return results; ++ } +} http://git-wip-us.apache.org/repos/asf/metron/blob/cad2f408/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --cc metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties index 541f368,0000000..1c2359a mode 100644,000000..100644 --- a/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties +++ b/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties @@@ -1,34 -1,0 +1,32 @@@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +# Root logger option +log4j.rootLogger=ERROR, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n - log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter - log4j.appender.stdout.filter.1.StringToMatch=Connection timed out - log4j.appender.stdout.filter.1.AcceptOnMatch=false - log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter - log4j.appender.stdout.filter.2.StringToMatch=Background - log4j.appender.stdout.filter.2.AcceptOnMatch=false ++ ++# uncomment below to help debug tests ++#log4j.logger.org.apache.metron.profiler=ALL ++#log4j.logger.org.apache.storm.windowing=ALL