METRON-1748 Improve Storm Profiler Integration Test (nickwallen) closes apache/metron#1174
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/f5f765ca Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/f5f765ca Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/f5f765ca Branch: refs/heads/master Commit: f5f765cafcd985a323b5774ff86fe6d43f6186d2 Parents: 6231074 Author: nickwallen <[email protected]> Authored: Fri Sep 7 08:52:24 2018 -0400 Committer: nickallen <[email protected]> Committed: Fri Sep 7 08:52:24 2018 -0400 ---------------------------------------------------------------------- .../profiler/DefaultMessageDistributor.java | 18 +- .../src/test/resources/log4j.properties | 3 + .../profiler/bolt/ProfileBuilderBolt.java | 55 ++- .../zookeeper/event-time-test/profiler.json | 19 +- .../integration/ProfilerIntegrationTest.java | 365 +++++++++++-------- .../src/test/resources/log4j.properties | 10 +- .../src/test/resources/telemetry.json | 100 +++++ 7 files changed, 383 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index 82f7174..dee6bd8 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -175,10 +175,10 @@ public class DefaultMessageDistributor implements MessageDistributor { */ @Override public List<ProfileMeasurement> flush() { + LOG.debug("About to flush active profiles"); // cache maintenance needed here to ensure active profiles will expire - activeCache.cleanUp(); - expiredCache.cleanUp(); + cacheMaintenance(); List<ProfileMeasurement> measurements = flushCache(activeCache); return measurements; @@ -200,10 +200,10 @@ public class DefaultMessageDistributor implements MessageDistributor { */ @Override public List<ProfileMeasurement> flushExpired() { + LOG.debug("About to flush expired profiles"); // cache maintenance needed here to ensure active profiles will expire - activeCache.cleanUp(); - expiredCache.cleanUp(); + cacheMaintenance(); // flush all expired profiles List<ProfileMeasurement> measurements = flushCache(expiredCache); @@ -215,6 +215,16 @@ public class DefaultMessageDistributor implements MessageDistributor { } /** + * Performs cache maintenance on both the active and expired caches. + */ + private void cacheMaintenance() { + activeCache.cleanUp(); + expiredCache.cleanUp(); + + LOG.debug("Cache maintenance complete: activeCacheSize={}, expiredCacheSize={}", activeCache.size(), expiredCache.size()); + } + + /** * Flush all of the profiles maintained in a cache. * * @param cache The cache to flush. http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties b/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties index 70be8ae..82b0022 100644 --- a/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties +++ b/metron-analytics/metron-profiler-common/src/test/resources/log4j.properties @@ -26,3 +26,6 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# uncomment below to help debug tests +#log4j.logger.org.apache.metron.profiler=DEBUG http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java index 0d1f27e..6c22b45 100644 --- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java +++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java @@ -20,7 +20,6 @@ package org.apache.metron.profiler.bolt; -import org.apache.commons.collections4.CollectionUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -56,10 +55,12 @@ import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; +import java.util.LongSummaryStatistics; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static java.lang.String.format; import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.ENTITY_TUPLE_FIELD; @@ -73,6 +74,12 @@ import static org.apache.metron.profiler.bolt.ProfileSplitterBolt.TIMESTAMP_TUPL * <p>This bolt maintains the state required to build a Profile. When the window * period expires, the data is summarized as a {@link ProfileMeasurement}, all state is * flushed, and the {@link ProfileMeasurement} is emitted. + * + * <p>There are two mechanisms that will cause a profile to flush. As new messages arrive, + * time is advanced. The splitter bolt attaches a timestamp to each message (which can be + * either event or system time.) This advances time and leads to profile measurements + * being flushed. Alternatively, if no messages arrive to advance time, then the "time-to-live" + * mechanism will flush a profile after no messages have been received for some period of time. */ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { @@ -283,16 +290,47 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { .build(); } + /** + * Logs information about the {@link TupleWindow}. + * + * @param window The tuple window. + */ + private void log(TupleWindow window) { + // summarize the newly received tuples + LongSummaryStatistics received = window.get() + .stream() + .map(tuple -> getField(TIMESTAMP_TUPLE_FIELD, tuple, Long.class)) + .collect(Collectors.summarizingLong(Long::longValue)); + + LOG.debug("Tuple(s) received; count={}, min={}, max={}, range={} ms", + received.getCount(), + received.getMin(), + received.getMax(), + received.getMax() - received.getMin()); + + if (window.getExpired().size() > 0) { + // summarize the expired tuples + LongSummaryStatistics expired = window.getExpired() + .stream() + .map(tuple -> getField(TIMESTAMP_TUPLE_FIELD, tuple, Long.class)) + .collect(Collectors.summarizingLong(Long::longValue)); + + LOG.debug("Tuple(s) expired; count={}, min={}, max={}, range={} ms, lag={} ms", + expired.getCount(), + expired.getMin(), + expired.getMax(), + expired.getMax() - expired.getMin(), + received.getMin() - expired.getMin()); + } + } + @Override public void execute(TupleWindow window) { - - LOG.debug("Tuple window contains {} tuple(s), {} expired, {} new", - CollectionUtils.size(window.get()), - CollectionUtils.size(window.getExpired()), - CollectionUtils.size(window.getNew())); + if(LOG.isDebugEnabled()) { + log(window); + } try { - // handle each tuple in the window for(Tuple tuple : window.get()) { handleMessage(tuple); @@ -304,7 +342,6 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { } } catch (Throwable e) { - LOG.error("Unexpected error", e); collector.reportError(e); } @@ -361,7 +398,7 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable { // keep track of time activeFlushSignal.update(timestamp); - + // distribute the message MessageRoute route = new MessageRoute(definition, entity); synchronized (messageDistributor) { http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json index 9d727a3..534b7c6 100644 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json +++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json @@ -1,12 +1,19 @@ { + "timestampField": "timestamp", "profiles": [ { - "profile": "event-time-test", + "profile": "count-by-ip", "foreach": "ip_src_addr", - "init": { "counter": "0" }, - "update": { "counter": "counter + 1" }, - "result": "counter" + "init": { "count": 0 }, + "update": { "count" : "count + 1" }, + "result": "count" + }, + { + "profile": "total-count", + "foreach": "'total'", + "init": { "count": 0 }, + "update": { "count": "count + 1" }, + "result": "count" } - ], - "timestampField": "timestamp" + ] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java index 268ce26..322ba13 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java @@ -20,17 +20,9 @@ package org.apache.metron.profiler.integration; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.FieldSerializer; import org.adrianwalker.multilinestring.Multiline; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.io.FileUtils; import org.apache.metron.common.Constants; -import org.apache.metron.common.utils.SerDeUtils; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.integration.BaseIntegrationTest; @@ -39,42 +31,44 @@ import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.FluxTopologyComponent; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.profiler.hbase.ColumnBuilder; -import org.apache.metron.profiler.hbase.RowKeyBuilder; -import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; -import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; +import org.apache.metron.profiler.client.stellar.FixedLookback; +import org.apache.metron.profiler.client.stellar.GetProfile; +import org.apache.metron.profiler.client.stellar.WindowLookback; +import org.apache.metron.statistics.OnlineStatisticsProvider; +import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor; +import org.apache.metron.stellar.common.StellarStatefulExecutor; +import org.apache.metron.stellar.dsl.Context; +import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver; import org.apache.storm.Config; -import org.apache.storm.serialization.KryoTupleDeserializer; -import org.apache.storm.serialization.KryoTupleSerializer; -import org.apache.storm.serialization.KryoValuesDeserializer; -import org.apache.storm.serialization.KryoValuesSerializer; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; +import java.lang.invoke.MethodHandles; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import static com.google.code.tempusfugit.temporal.Duration.seconds; import static com.google.code.tempusfugit.temporal.Timeout.timeout; import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; -import static org.junit.Assert.assertArrayEquals; +import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY; +import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE; +import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER; +import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD; +import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS; +import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** @@ -82,6 +76,7 @@ import static org.junit.Assert.assertTrue; */ public class ProfilerIntegrationTest extends BaseIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler/src/test"; private static final String FLUX_PATH = "../metron-profiler/src/main/flux/profiler/remote.yaml"; @@ -94,13 +89,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static final String outputTopic = "profiles"; private static final int saltDivisor = 10; - private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1); - private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5); - private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10); - private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15); + private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(20); + private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10); + private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(10); + private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20); + private static final long maxRoutesPerBolt = 100000; - private static ColumnBuilder columnBuilder; private static ZKServerComponent zkComponent; private static FluxTopologyComponent fluxComponent; private static KafkaComponent kafkaComponent; @@ -112,6 +107,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { private static String message2; private static String message3; + private StellarStatefulExecutor executor; + /** * [ * org.apache.metron.profiler.ProfileMeasurement, @@ -122,6 +119,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { * org.apache.metron.common.configuration.profiler.ProfilerConfig, * org.apache.metron.common.configuration.profiler.ProfileConfig, * org.json.simple.JSONObject, + * org.json.simple.JSONArray, * java.util.LinkedHashMap, * org.apache.metron.statistics.OnlineStatisticsProvider * ] @@ -129,78 +127,132 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { @Multiline private static String kryoSerializers; - /** - * The Profiler can generate profiles based on processing time. With processing time, - * the Profiler builds profiles based on when the telemetry was processed. - * - * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler - * to use processing time. - */ @Test public void testProcessingTime() throws Exception { + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - // upload the config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - - // start the topology and write test messages to kafka + // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); - - // the messages that will be applied to the profile kafkaComponent.writeMessages(inputTopic, message1); kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); - // storm needs at least one message to close its event window + // retrieve the profile measurement using PROFILE_GET + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + List<Integer> measurements = execute(profileGetExpression, List.class); + + // need to keep checking for measurements until the profiler has flushed one out int attempt = 0; - while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { + while(measurements.size() == 0 && attempt++ < 10) { - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); + // wait for the profiler to flush + long sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); - // send another message to help close the current event window + // write another message to advance time. this ensures we are testing the 'normal' flush mechanism. + // if we do not send additional messages to advance time, then it is the profile TTL mechanism which + // will ultimately flush the profile kafkaComponent.writeMessages(inputTopic, message2); + + // try again to retrieve the profile measurement using PROFILE_GET + measurements = execute(profileGetExpression, List.class); } - // validate what was flushed - List<Integer> actuals = read( - profilerTable.getPutLog(), - columnFamily, - columnBuilder.getColumnQualifier("value"), - Integer.class); - assertEquals(1, actuals.size()); - assertTrue(actuals.get(0) >= 3); + // expect to see only 1 measurement, but could be more (one for each period) depending on + // how long we waited for the flush to occur + assertTrue(measurements.size() > 0); + + // the profile should have counted at least 3 messages; the 3 test messages that were sent. + // the count could be higher due to the test messages we sent to advance time. + assertTrue(measurements.get(0) >= 3); } - /** - * The Profiler can generate profiles using event time. With event time processing, - * the Profiler uses timestamps contained in the source telemetry. - * - * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler - * from which field the timestamp should be extracted. - */ @Test - public void testEventTime() throws Exception { - - // upload the profiler config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test"); + public void testProcessingTimeWithTimeToLiveFlush() throws Exception { + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - // start the topology and write test messages to kafka + // start the topology and write 3 test messages to kafka fluxComponent.submitTopology(); kafkaComponent.writeMessages(inputTopic, message1); kafkaComponent.writeMessages(inputTopic, message2); kafkaComponent.writeMessages(inputTopic, message3); - // wait until the profile is flushed - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); + // wait a bit beyond the window lag before writing another message. this allows storm's window manager to close + // the event window, which then lets the profiler processes the previous messages. + long sleep = windowLagMillis + periodDurationMillis; + LOG.debug("Waiting {} millis before sending message to close window", sleep); + Thread.sleep(sleep); + kafkaComponent.writeMessages(inputTopic, message3); + + // retrieve the profile measurement using PROFILE_GET + String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))"; + List<Integer> measurements = execute(profileGetExpression, List.class); - List<Put> puts = profilerTable.getPutLog(); - assertEquals(1, puts.size()); + // need to keep checking for measurements until the profiler has flushed one out + int attempt = 0; + while(measurements.size() == 0 && attempt++ < 10) { + + // wait for the profiler to flush + sleep = windowDurationMillis; + LOG.debug("Waiting {} millis for profiler to flush", sleep); + Thread.sleep(sleep); + + // do not write additional messages to advance time. this ensures that we are testing the "time to live" + // flush mechanism. the TTL setting defines when the profile will be flushed + + // try again to retrieve the profile measurement + measurements = execute(profileGetExpression, List.class); + } + + // expect to see only 1 measurement, but could be more (one for each period) depending on + // how long we waited for the flush to occur + assertTrue(measurements.size() > 0); - // inspect the row key to ensure the profiler used event time correctly. the timestamp - // embedded in the row key should match those in the source telemetry - byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); - byte[] actualRowKey = puts.get(0).getRow(); - assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); + // the profile should have counted 3 messages; the 3 test messages that were sent + assertEquals(3, measurements.get(0).intValue()); + } + + @Test + public void testEventTime() throws Exception { + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/event-time-test"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); + kafkaComponent.writeMessages(inputTopic, messages); + + long timestamp = System.currentTimeMillis(); + LOG.debug("Attempting to close window period by sending message with timestamp = {}", timestamp); + kafkaComponent.writeMessages(inputTopic, getMessage("192.168.66.1", timestamp)); + kafkaComponent.writeMessages(inputTopic, getMessage("192.168.138.158", timestamp)); + + // create the 'window' that looks up to 5 hours before the max timestamp contained in the test data + assign("maxTimestamp", "1530978728982L"); + assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); + + // wait until the profile flushes both periods. the first period will flush immediately as subsequent messages + // advance time. the next period contains all of the remaining messages, so there are no other messages to + // advance time. because of this the next period only flushes after the time-to-live expires + waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, timeout(seconds(90))); + { + // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1 + List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class); + assertEquals(14, results.get(0)); + assertEquals(12, results.get(1)); + } + { + // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158 + List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class); + assertEquals(36, results.get(0)); + assertEquals(38, results.get(1)); + } + { + // in all there are 50 messages in the first period and 50 messages in the next + List results = execute("PROFILE_GET('total-count', 'total', window)", List.class); + assertEquals(50, results.get(0)); + assertEquals(50, results.get(1)); + } } /** @@ -212,34 +264,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { */ @Test public void testProfileWithStatsObject() throws Exception { - - // upload the profiler config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); + uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); // start the topology and write test messages to kafka fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1); - kafkaComponent.writeMessages(inputTopic, message2); - kafkaComponent.writeMessages(inputTopic, message3); + List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json")); + kafkaComponent.writeMessages(inputTopic, messages); // wait until the profile is flushed waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - // ensure that a value was persisted in HBase - List<Put> puts = profilerTable.getPutLog(); - assertEquals(1, puts.size()); - - // generate the expected row key. only the profile name, entity, and period are used to generate the row key - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile-with-stats") - .withEntity("global") - .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS); - RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); - byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement); - - // ensure the correct row key was generated - byte[] actualRowKey = puts.get(0).getRow(); - assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); + // validate the measurements written by the batch profiler using `PROFILE_GET` + // the 'window' looks up to 5 hours before the max timestamp contained in the test data + assign("maxTimestamp", "1530978728982L"); + assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)"); + + // retrieve the stats stored by the profiler + List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class); + assertTrue(results.size() > 0); + assertTrue(results.get(0) instanceof OnlineStatisticsProvider); } /** @@ -256,73 +299,21 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { new String(actual, "UTF-8")); } - /** - * Generates the expected row key. - * - * @param profileName The name of the profile. - * @param entity The entity. - * @param whenMillis A timestamp in epoch milliseconds. - * @return A row key. - */ - private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) { - - // only the profile name, entity, and period are used to generate the row key - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName(profileName) - .withEntity(entity) - .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS); - - // build the row key - RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); - return rowKeyBuilder.rowKey(measurement); - } - - /** - * Reads a value written by the Profiler. - * - * @param family The column family. - * @param qualifier The column qualifier. - * @param clazz The expected type of the value. - * @param <T> The expected type of the value. - * @return The value written by the Profiler. - */ - private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, Class<T> clazz) { - List<T> results = new ArrayList<>(); - - for(Put put: puts) { - List<Cell> cells = put.get(Bytes.toBytes(family), qualifier); - for(Cell cell : cells) { - T value = SerDeUtils.fromBytes(cell.getValue(), clazz); - results.add(value); - } - } - - return results; + private static String getMessage(String ipSource, long timestamp) { + return new MessageBuilder() + .withField("ip_src_addr", ipSource) + .withField("timestamp", timestamp) + .build() + .toJSONString(); } @BeforeClass public static void setupBeforeClass() throws UnableToStartException { // create some messages that contain a timestamp - a really old timestamp; close to 1970 - message1 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt) - .build() - .toJSONString(); - - message2 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + 100) - .build() - .toJSONString(); - - message3 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + (windowDurationMillis * 2)) - .build() - .toJSONString(); - - columnBuilder = new ValueOnlyColumnBuilder(columnFamily); + message1 = getMessage(entity, startAt); + message2 = getMessage(entity, startAt + 100); + message3 = getMessage(entity, startAt + (windowDurationMillis * 2)); // storm topology properties final Properties topologyProperties = new Properties() {{ @@ -330,6 +321,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { // storm settings setProperty("profiler.workers", "1"); setProperty("profiler.executors", "0"); + setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]"); setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60"); setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000"); @@ -343,7 +335,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { // kafka settings setProperty("profiler.input.topic", inputTopic); setProperty("profiler.output.topic", outputTopic); - setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); + setProperty("kafka.start", "EARLIEST"); setProperty("kafka.security.protocol", "PLAINTEXT"); // hbase settings @@ -412,6 +404,30 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { public void setup() { // create the mock table profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); + + // global properties + Map<String, Object> global = new HashMap<String, Object>() {{ + put(PROFILER_HBASE_TABLE.getKey(), tableName); + put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily); + put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName()); + + // client needs to use the same period duration + put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis)); + put(PROFILER_PERIOD_UNITS.getKey(), "MILLISECONDS"); + + // client needs to use the same salt divisor + put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor); + }}; + + // create the stellar execution environment + executor = new DefaultStellarStatefulExecutor( + new SimpleFunctionResolver() + .withClass(GetProfile.class) + .withClass(FixedLookback.class) + .withClass(WindowLookback.class), + new Context.Builder() + .with(Context.Capabilities.GLOBAL_CONFIG, () -> global) + .build()); } @After @@ -428,10 +444,35 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest { * @param path The path on the local filesystem to the config values. * @throws Exception */ - public void uploadConfig(String path) throws Exception { + public void uploadConfigToZookeeper(String path) throws Exception { configUploadComponent .withGlobalConfiguration(path) .withProfilerConfiguration(path) .update(); } + + /** + * Assign a value to the result of an expression. + * + * @param var The variable to assign. + * @param expression The expression to execute. + */ + private void assign(String var, String expression) { + executor.assign(var, expression, Collections.emptyMap()); + } + + /** + * Execute a Stellar expression. + * + * @param expression The Stellar expression to execute. + * @param clazz + * @param <T> + * @return The result of executing the Stellar expression. + */ + private <T> T execute(String expression, Class<T> clazz) { + T results = executor.execute(expression, Collections.emptyMap(), clazz); + + LOG.debug("{} = {}", expression, results); + return results; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/f5f765ca/metron-analytics/metron-profiler/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/resources/log4j.properties b/metron-analytics/metron-profiler/src/test/resources/log4j.properties index 541f368..1c2359a 100644 --- a/metron-analytics/metron-profiler/src/test/resources/log4j.properties +++ b/metron-analytics/metron-profiler/src/test/resources/log4j.properties @@ -26,9 +26,7 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n -log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter -log4j.appender.stdout.filter.1.StringToMatch=Connection timed out -log4j.appender.stdout.filter.1.AcceptOnMatch=false -log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter -log4j.appender.stdout.filter.2.StringToMatch=Background -log4j.appender.stdout.filter.2.AcceptOnMatch=false \ No newline at end of file + +# uncomment below to help debug tests +#log4j.logger.org.apache.metron.profiler=ALL +#log4j.logger.org.apache.storm.windowing=ALL
