http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE b/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE new file mode 100755 index 0000000..1c05f2d --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/main/resources/META-INF/NOTICE @@ -0,0 +1,92 @@ + +metron-profiler +Copyright 2006-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project contains annotations derived from JCIP-ANNOTATIONS +Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net + +=============================================================================== + +The BracketFinder (package org.apache.commons.math3.optimization.univariate) +and PowellOptimizer (package org.apache.commons.math3.optimization.general) +classes are based on the Python code in module "optimize.py" (version 0.5) +developed by Travis E. Oliphant for the SciPy library (http://www.scipy.org/) +Copyright © 2003-2009 SciPy Developers. +=============================================================================== + +The LinearConstraint, LinearObjectiveFunction, LinearOptimizer, +RelationShip, SimplexSolver and SimplexTableau classes in package +org.apache.commons.math3.optimization.linear include software developed by +Benjamin McCann (http://www.benmccann.com) and distributed with +the following copyright: Copyright 2009 Google Inc. +=============================================================================== + +This product includes software developed by the +University of Chicago, as Operator of Argonne National +Laboratory. +The LevenbergMarquardtOptimizer class in package +org.apache.commons.math3.optimization.general includes software +translated from the lmder, lmpar and qrsolv Fortran routines +from the Minpack package +Minpack Copyright Notice (1999) University of Chicago. All rights reserved +=============================================================================== + +The GraggBulirschStoerIntegrator class in package +org.apache.commons.math3.ode.nonstiff includes software translated +from the odex Fortran routine developed by E. Hairer and G. Wanner. +Original source copyright: +Copyright (c) 2004, Ernst Hairer +=============================================================================== + +The EigenDecompositionImpl class in package +org.apache.commons.math3.linear includes software translated +from some LAPACK Fortran routines. Original source copyright: +Copyright (c) 1992-2008 The University of Tennessee. All rights reserved. +=============================================================================== + +The MersenneTwister class in package org.apache.commons.math3.random +includes software translated from the 2002-01-26 version of +the Mersenne-Twister generator written in C by Makoto Matsumoto and Takuji +Nishimura. Original source copyright: +Copyright (C) 1997 - 2002, Makoto Matsumoto and Takuji Nishimura, +All rights reserved +=============================================================================== + +The LocalizedFormatsTest class in the unit tests is an adapted version of +the OrekitMessagesTest class from the orekit library distributed under the +terms of the Apache 2 licence. Original source copyright: +Copyright 2010 CS Systèmes d'Information +=============================================================================== + +The HermiteInterpolator class and its corresponding test have been imported from +the orekit library distributed under the terms of the Apache 2 licence. Original +source copyright: +Copyright 2010-2012 CS Systèmes d'Information +=============================================================================== + +The creation of the package "o.a.c.m.analysis.integration.gauss" was inspired +by an original code donated by Sébastien Brisard. +=============================================================================== + +The complete text of licenses and disclaimers associated with the the original +sources enumerated above at the time of code translation are in the LICENSE.txt +file. + + The Netty Project + ================= + +Please visit the Netty web site for more information: + + * http://netty.io/ + +Copyright 2011 The Netty Project + +Google Guice - Core Library +Copyright 2006-2011 Google, Inc. + +Google Guice - Extensions - Servlet +Copyright 2006-2011 Google, Inc. +
http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh b/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh new file mode 100644 index 0000000..6ec78f5 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/main/scripts/start_profiler_topology.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +METRON_VERSION=${project.version} +METRON_HOME=/usr/metron/$METRON_VERSION +TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar +storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/profiler/remote.yaml --filter $METRON_HOME/config/profiler.properties http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json new file mode 100644 index 0000000..9d727a3 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json @@ -0,0 +1,12 @@ +{ + "profiles": [ + { + "profile": "event-time-test", + "foreach": "ip_src_addr", + "init": { "counter": "0" }, + "update": { "counter": "counter + 1" }, + "result": "counter" + } + ], + "timestampField": "timestamp" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json new file mode 100644 index 0000000..e75ec0f --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json @@ -0,0 +1,11 @@ +{ + "profiles": [ + { + "profile": "processing-time-test", + "foreach": "ip_src_addr", + "init": { "counter": "0" }, + "update": { "counter": "counter + 1" }, + "result": "counter" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json new file mode 100644 index 0000000..083e73f --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json @@ -0,0 +1,12 @@ +{ + "profiles": [ + { + "profile": "profile-with-stats", + "foreach": "'global'", + "init": { "stats": "STATS_INIT()" }, + "update": { "stats": "STATS_ADD(stats, 1)" }, + "result": "stats" + } + ], + "timestampField": "timestamp" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java new file mode 100644 index 0000000..8b8813b --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/FixedFrequencyFlushSignalTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@code FixedFrequencyFlushSignal} class. + */ +public class FixedFrequencyFlushSignalTest { + + @Test + public void testSignalFlush() { + + FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000); + + // not time to flush yet + assertFalse(signal.isTimeToFlush()); + + // advance time + signal.update(5000); + + // not time to flush yet + assertFalse(signal.isTimeToFlush()); + + // advance time + signal.update(7000); + + // time to flush + assertTrue(signal.isTimeToFlush()); + } + + @Test + public void testOutOfOrderTimestamps() { + FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000); + + // advance time, out-of-order + signal.update(5000); + signal.update(1000); + signal.update(7000); + signal.update(3000); + + // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even out-of-order), then it should signal a flush + assertTrue(signal.isTimeToFlush()); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeFrequency() { + new FixedFrequencyFlushSignal(-1000); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java new file mode 100644 index 0000000..2f9eca4 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/HBaseEmitterTest.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Values; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests the HBaseEmitter class. + */ +public class HBaseEmitterTest { + + /** + * { + * "profile": "profile-one", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x" + * } + */ + @Multiline + private String profileDefinition; + + private HBaseEmitter emitter; + private ProfileConfig profile; + private OutputCollector collector; + + @Before + public void setup() throws Exception { + emitter = new HBaseEmitter(); + profile = createDefinition(profileDefinition); + collector = Mockito.mock(OutputCollector.class); + } + + /** + * The handler should emit a message containing the result of executing + * the 'result/profile' expression. + */ + @Test + public void testEmit() throws Exception { + + // create a measurement that has triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withDefinition(profile) + .withProfileValue(22); + + // execute the test + emitter.emit(measurement, collector); + + // the measurement should be emitted as-is + ProfileMeasurement actual = expectMeasurement(emitter, collector); + assertEquals(measurement, actual); + } + + /** + * Verifies that the emitter does emit a {@code ProfileMeasurement}. + * + * @return The {@code ProfileMeasurement} that was emitted + */ + private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, OutputCollector collector) { + + ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof ProfileMeasurement); + return (ProfileMeasurement) values.get(0); + } + + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfileConfig createDefinition(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java new file mode 100644 index 0000000..51ca3a4 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/KafkaEmitterTest.java @@ -0,0 +1,291 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import com.google.common.collect.ImmutableMap; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.statistics.OnlineStatisticsProvider; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests the KafkaDestinationHandler. + */ +public class KafkaEmitterTest { + + /** + * { + * "profile": "profile-one-destination", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": { + * "profile": "x", + * "triage": { + * "value": "x" + * } + * } + * } + */ + @Multiline + private String profileDefinitionWithTriage; + + private KafkaEmitter kafkaEmitter; + private ProfileConfig profile; + private OutputCollector collector; + + @Before + public void setup() throws Exception { + kafkaEmitter = new KafkaEmitter(); + profile = createDefinition(profileDefinitionWithTriage); + collector = Mockito.mock(OutputCollector.class); + } + + /** + * The handler should emit a message when a result/triage expression(s) has been defined. + */ + @Test + public void testEmit() throws Exception { + + // create a measurement that has triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withDefinition(profile) + .withTriageValues(Collections.singletonMap("triage-key", "triage-value")); + + // execute the test + kafkaEmitter.emit(measurement, collector); + + // a message should be emitted + verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any()); + } + + /** + * The handler should NOT emit a message when there is NO result/triage value(s). + */ + @Test + public void testDoNotEmit() throws Exception { + + // create a measurement with NO triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withDefinition(profile); + + // execute the test + kafkaEmitter.emit(measurement, collector); + + // a message should NOT be emitted + verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any()); + } + + /** + * Validate that the message generated for Kafka should include the triage value. + */ + @Test + public void testTriageValueInMessage() throws Exception { + + // create a measurement that has triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", "triage-value")); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // validate the core parts of the message + assertEquals(measurement.getProfileName(), actual.get("profile")); + assertEquals(measurement.getEntity(), actual.get("entity")); + assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); + assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); + assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); + assertEquals("profiler", actual.get("source.type")); + assertNotNull(actual.get("timestamp")); + + // validate that the triage value has been added + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + } + + /** + * Validate that the message generated for Kafka can include multiple triage values. + */ + @Test + public void testMultipleTriageValueInMessage() throws Exception { + + // multiple triage values have been defined + Map<String, Object> triageValues = ImmutableMap.of( + "x", 2, + "y", "4", + "z", 6.0); + + // create a measurement that has multiple triage values + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(triageValues); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // validate that ALL of the triage values have been added + assertEquals(measurement.getTriageValues().get("x"), actual.get("x")); + assertEquals(measurement.getTriageValues().get("y"), actual.get("y")); + assertEquals(measurement.getTriageValues().get("z"), actual.get("z")); + } + + /** + * Values destined for Kafka can only be serialized into text, which limits the types of values + * that can result from a triage expression. Only primitive types and Strings are allowed. + */ + @Test + public void testInvalidType() throws Exception { + + // create one invalid expression and one valid expression + Map<String, Object> triageValues = ImmutableMap.of( + "invalid", new OnlineStatisticsProvider(), + "valid", 4); + + // create the measurement with a Map as a triage value; this is not allowed + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(triageValues); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // validate the core parts of the message still exist + assertEquals(measurement.getProfileName(), actual.get("profile")); + assertEquals(measurement.getEntity(), actual.get("entity")); + assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); + assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); + assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); + assertEquals("profiler", actual.get("source.type")); + + // the invalid expression should be skipped and not included in the message + assertFalse(actual.containsKey("invalid")); + + // but the valid expression should still be there + assertEquals(triageValues.get("valid"), actual.get("valid")); + } + + /** + * Values destined for Kafka can only be serialized into text, which limits the types of values + * that can result from a triage expression. Only primitive types and Strings are allowed. + */ + @Test + public void testIntegerIsValidType() throws Exception { + + // create a measurement with a triage value that is an integer + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", 123)); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // the triage expression is valid + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + } + + /** + * Values destined for Kafka can only be serialized into text, which limits the types of values + * that can result from a triage expression. Only primitive types and Strings are allowed. + */ + @Test + public void testStringIsValidType() throws Exception { + + // create a measurement with a triage value that is a string + ProfileMeasurement measurement = new ProfileMeasurement() + .withDefinition(profile) + .withProfileName(profile.getProfile()) + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withTriageValues(Collections.singletonMap("triage-key", "value")); + + // execute the test + kafkaEmitter.emit(measurement, collector); + JSONObject actual = expectJsonObject(kafkaEmitter, collector); + + // the triage expression is valid + assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); + } + + /** + * Verifies that the KafkaEmitter does emit a JSONObject. + * @return The JSONObject that was emitted + */ + private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, OutputCollector collector) { + + ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); + verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), arg.capture()); + Values values = arg.getValue(); + assertTrue(values.get(0) instanceof JSONObject); + return (JSONObject) values.get(0); + } + + /** + * Creates a profile definition based on a string of JSON. + * @param json The string of JSON. + */ + private ProfileConfig createDefinition(String json) throws IOException { + return JSONUtils.INSTANCE.load(json, ProfileConfig.class); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java new file mode 100644 index 0000000..fc94afa --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileBuilderBoltTest.java @@ -0,0 +1,356 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.profiler.MessageDistributor; +import org.apache.metron.profiler.MessageRoute; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.storm.integration.MessageBuilder; +import org.apache.metron.test.bolt.BaseBoltTest; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests the ProfileBuilderBolt. + */ +public class ProfileBuilderBoltTest extends BaseBoltTest { + + private JSONObject message1; + private JSONObject message2; + private ProfileConfig profile1; + private ProfileConfig profile2; + private ProfileMeasurementEmitter emitter; + private ManualFlushSignal flushSignal; + private ProfileMeasurement measurement; + + @Before + public void setup() throws Exception { + + message1 = new MessageBuilder() + .withField("ip_src_addr", "10.0.0.1") + .withField("value", "22") + .build(); + + message2 = new MessageBuilder() + .withField("ip_src_addr", "10.0.0.2") + .withField("value", "22") + .build(); + + profile1 = new ProfileConfig() + .withProfile("profile1") + .withForeach("ip_src_addr") + .withInit("x", "0") + .withUpdate("x", "x + 1") + .withResult("x"); + + profile2 = new ProfileConfig() + .withProfile("profile2") + .withForeach("ip_src_addr") + .withInit(Collections.singletonMap("x", "0")) + .withUpdate(Collections.singletonMap("x", "x + 1")) + .withResult("x"); + + measurement = new ProfileMeasurement() + .withEntity("entity1") + .withProfileName("profile1") + .withPeriod(1000, 500, TimeUnit.MILLISECONDS) + .withProfileValue(22); + + flushSignal = new ManualFlushSignal(); + flushSignal.setFlushNow(false); + } + + /** + * The bolt should extract a message and timestamp from a tuple and + * pass that to a {@code MessageDistributor}. + */ + @Test + public void testExtractMessage() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock + MessageDistributor distributor = mock(MessageDistributor.class); + bolt.withMessageDistributor(distributor); + + // create a tuple + final long timestamp1 = 100000000L; + Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1); + + // execute the bolt + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // the message should have been extracted from the tuple and passed to the MessageDistributor + verify(distributor).distribute(any(MessageRoute.class), any()); + } + + + /** + * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor} + * and emit the {@code ProfileMeasurement} values from all active profiles. + */ + @Test + public void testFlushActiveProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock that returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // signal the bolt to flush + flushSignal.setFlushNow(true); + + // execute the bolt + Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // a profile measurement should be emitted by the bolt + List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); + assertEquals(1, measurements.size()); + assertEquals(measurement, measurements.get(0)); + } + + /** + * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted. + */ + @Test + public void testDoNotFlushActiveProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock where flush() returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // there is no flush signal + flushSignal.setFlushNow(false); + + // execute the bolt + Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); + TupleWindow tupleWindow = createWindow(tuple1); + bolt.execute(tupleWindow); + + // nothing should have been emitted + getProfileMeasurements(outputCollector, 0); + } + + /** + * Expired profiles should be flushed regularly, even if no input telemetry + * has been received. + */ + @Test + public void testFlushExpiredProfiles() throws Exception { + + ProfileBuilderBolt bolt = createBolt(); + + // create a mock where flushExpired() returns the profile measurement above + MessageDistributor distributor = mock(MessageDistributor.class); + when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement)); + bolt.withMessageDistributor(distributor); + + // execute test by flushing expired profiles. this is normally triggered by a timer task. + bolt.flushExpired(); + + // a profile measurement should be emitted by the bolt + List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); + assertEquals(1, measurements.size()); + assertEquals(measurement, measurements.get(0)); + } + + /** + * A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each + * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations. + */ + @Test + public void testEmitters() throws Exception { + + // defines the zk configurations accessible from the bolt + ProfilerConfigurations configurations = new ProfilerConfigurations(); + configurations.updateGlobalConfig(Collections.emptyMap()); + + // create the bolt with 3 destinations + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withProfileTimeToLive(30, TimeUnit.MINUTES) + .withPeriodDuration(10, TimeUnit.MINUTES) + .withMaxNumberOfRoutes(Long.MAX_VALUE) + .withZookeeperClient(client) + .withZookeeperCache(cache) + .withEmitter(new TestEmitter("destination1")) + .withEmitter(new TestEmitter("destination2")) + .withEmitter(new TestEmitter("destination3")) + .withProfilerConfigurations(configurations) + .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES)); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + + // signal the bolt to flush + bolt.withFlushSignal(flushSignal); + flushSignal.setFlushNow(true); + + // execute the bolt + Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis()); + TupleWindow window = createWindow(tuple1); + bolt.execute(window); + + // validate measurements emitted to each + verify(outputCollector, times(1)).emit(eq("destination1"), any()); + verify(outputCollector, times(1)).emit(eq("destination2"), any()); + verify(outputCollector, times(1)).emit(eq("destination3"), any()); + } + + /** + * Retrieves the ProfileMeasurement(s) (if any) that have been emitted. + * + * @param collector The Storm output collector. + * @param expected The number of measurements expected. + * @return A list of ProfileMeasurement(s). + */ + private List<ProfileMeasurement> getProfileMeasurements(OutputCollector collector, int expected) { + + // the 'streamId' is defined by the DestinationHandler being used by the bolt + final String streamId = emitter.getStreamId(); + + // capture the emitted tuple(s) + ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class); + verify(collector, times(expected)) + .emit(eq(streamId), argCaptor.capture()); + + // return the profile measurements that were emitted + return argCaptor.getAllValues() + .stream() + .map(val -> (ProfileMeasurement) val.get(0)) + .collect(Collectors.toList()); + } + + /** + * Create a tuple that will contain the message, the entity name, and profile definition. + * @param entity The entity name + * @param message The telemetry message. + * @param profile The profile definition. + */ + private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) { + + Tuple tuple = mock(Tuple.class); + when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message); + when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp); + when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity); + when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile); + + return tuple; + } + + /** + * Create a ProfileBuilderBolt to test. + * @return A {@link ProfileBuilderBolt} to test. + */ + private ProfileBuilderBolt createBolt() throws IOException { + + // defines the zk configurations accessible from the bolt + ProfilerConfigurations configurations = new ProfilerConfigurations(); + configurations.updateGlobalConfig(Collections.emptyMap()); + + emitter = new HBaseEmitter(); + ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() + .withProfileTimeToLive(30, TimeUnit.MINUTES) + .withMaxNumberOfRoutes(Long.MAX_VALUE) + .withZookeeperClient(client) + .withZookeeperCache(cache) + .withEmitter(emitter) + .withProfilerConfigurations(configurations) + .withPeriodDuration(1, TimeUnit.MINUTES) + .withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + + // set the flush signal AFTER calling 'prepare' + bolt.withFlushSignal(flushSignal); + + return bolt; + } + + /** + * Creates a mock TupleWindow containing multiple tuples. + * @param tuples The tuples to add to the window. + */ + private TupleWindow createWindow(Tuple... tuples) { + + TupleWindow window = mock(TupleWindow.class); + when(window.get()).thenReturn(Arrays.asList(tuples)); + return window; + } + + /** + * An implementation for testing purposes only. + */ + private class TestEmitter implements ProfileMeasurementEmitter { + + private String streamId; + + public TestEmitter(String streamId) { + this.streamId = streamId; + } + + @Override + public String getStreamId() { + return streamId; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(getStreamId(), new Fields("measurement")); + } + + @Override + public void emit(ProfileMeasurement measurement, OutputCollector collector) { + collector.emit(getStreamId(), new Values(measurement)); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java new file mode 100644 index 0000000..f623d38 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileHBaseMapperTest.java @@ -0,0 +1,93 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfileResult; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.hbase.RowKeyBuilder; +import org.apache.storm.tuple.Tuple; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests the ProfileHBaseMapper class. + */ +public class ProfileHBaseMapperTest { + + private Tuple tuple; + private ProfileHBaseMapper mapper; + private ProfileMeasurement measurement; + private RowKeyBuilder rowKeyBuilder; + private ProfileConfig profile; + + @Before + public void setup() { + rowKeyBuilder = mock(RowKeyBuilder.class); + + mapper = new ProfileHBaseMapper(); + mapper.setRowKeyBuilder(rowKeyBuilder); + + profile = new ProfileConfig("profile", "ip_src_addr", new ProfileResult("2 + 2")); + + measurement = new ProfileMeasurement() + .withProfileName("profile") + .withEntity("entity") + .withPeriod(20000, 15, TimeUnit.MINUTES) + .withProfileValue(22) + .withDefinition(profile); + + // the tuple will contain the original message + tuple = mock(Tuple.class); + when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement); + } + + /** + * The mapper should return the expiration for a tuple based on the Profile definition. + */ + @Test + public void testExpires() throws Exception { + final Long expiresDays = 30L; + profile.setExpires(expiresDays); + + Optional<Long> actual = mapper.getTTL(tuple); + Assert.assertTrue(actual.isPresent()); + Assert.assertEquals(expiresDays, (Long) TimeUnit.MILLISECONDS.toDays(actual.get())); + } + + /** + * The expiration field is optional within a Profile definition. + */ + @Test + public void testExpiresUndefined() throws Exception { + // the TTL should not be defined + Optional<Long> actual = mapper.getTTL(tuple); + Assert.assertFalse(actual.isPresent()); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java new file mode 100644 index 0000000..93d2ac4 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/ProfileSplitterBoltTest.java @@ -0,0 +1,455 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.profiler.ProfileConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.profiler.DefaultMessageRouter; +import org.apache.metron.profiler.clock.FixedClockFactory; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.test.bolt.BaseBoltTest; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests the ProfileSplitterBolt. + */ +public class ProfileSplitterBoltTest extends BaseBoltTest { + + /** + * { + * "ip_src_addr": "10.0.0.1", + * "ip_dst_addr": "10.0.0.20", + * "protocol": "HTTP", + * "timestamp.custom": 2222222222222, + * "timestamp.string": "3333333333333" + * } + */ + @Multiline + private String input; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "onlyif": "protocol == 'HTTP'", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ] + * } + */ + @Multiline + private String profileWithOnlyIfTrue; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "onlyif": "false", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ] + * } + */ + @Multiline + private String profileWithOnlyIfFalse; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ] + * } + */ + @Multiline + private String profileWithOnlyIfMissing; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "onlyif": "NOT-VALID", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ] + * } + */ + @Multiline + private String profileWithOnlyIfInvalid; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ], + * "timestampField": "timestamp.custom" + * } + */ + @Multiline + private String profileUsingCustomTimestampField; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ], + * "timestampField": "timestamp.missing" + * } + */ + @Multiline + private String profileUsingMissingTimestampField; + + /** + * { + * "profiles": [ + * { + * "profile": "test", + * "foreach": "ip_src_addr", + * "init": {}, + * "update": {}, + * "result": "2" + * } + * ], + * "timestampField": "timestamp.string" + * } + */ + @Multiline + private String profileUsingStringTimestampField; + + /** + * { + * "profiles": [ + * ] + * } + */ + @Multiline + private String noProfilesDefined; + + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "'global'", + * "result": "1" + * }, + * { + * "profile": "profile2", + * "foreach": "'global'", + * "result": "2" + * } + * ] + * } + */ + @Multiline + private String twoProfilesDefined; + + private JSONObject message; + private long timestamp = 3333333; + + @Before + public void setup() throws ParseException { + + // parse the input message + JSONParser parser = new JSONParser(); + message = (JSONObject) parser.parse(input); + + // ensure the tuple returns the expected json message + when(tuple.getBinary(0)).thenReturn(input.getBytes()); + } + + /** + * Ensure that a tuple with the correct fields is emitted to downstream bolts + * when a profile is defined. + */ + @Test + public void testEmitTupleWithOneProfile() throws Exception { + + // setup the bolt and execute a tuple + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // the expected tuple fields + String expectedEntity = "10.0.0.1"; + ProfileConfig expectedConfig = config.getProfiles().get(0); + Values expected = new Values(message, timestamp, expectedEntity, expectedConfig); + + // a tuple should be emitted for the downstream profile builder + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * If there are two profiles that need the same message, then two tuples should + * be emitted. One tuple for each profile. + */ + @Test + public void testEmitTupleWithTwoProfiles() throws Exception { + + // setup the bolt and execute a tuple + ProfilerConfig config = toProfilerConfig(twoProfilesDefined); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // the expected tuple fields + final String expectedEntity = "global"; + { + // a tuple should be emitted for the first profile + ProfileConfig profile1 = config.getProfiles().get(0); + Values expected = new Values(message, timestamp, expectedEntity, profile1); + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + } + { + // a tuple should be emitted for the second profile + ProfileConfig profile2 = config.getProfiles().get(1); + Values expected = new Values(message, timestamp, expectedEntity, profile2); + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + } + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * No tuples should be emitted, if no profiles are defined. + */ + @Test + public void testNoProfilesDefined() throws Exception { + + // setup the bolt and execute a tuple + ProfilerConfig config = toProfilerConfig(noProfilesDefined); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // no tuple should be emitted + verify(outputCollector, times(0)) + .emit(any(Tuple.class), any()); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * What happens when a profile's 'onlyif' expression is true? The message + * should be applied to the profile. + */ + @Test + public void testOnlyIfTrue() throws Exception { + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // a tuple should be emitted for the downstream profile builder + verify(outputCollector, times(1)) + .emit(eq(tuple), any(Values.class)); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * All messages are applied to a profile where 'onlyif' is missing. A profile with no + * 'onlyif' is treated the same as if 'onlyif=true'. + */ + @Test + public void testOnlyIfMissing() throws Exception { + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // a tuple should be emitted for the downstream profile builder + verify(outputCollector, times(1)) + .emit(eq(tuple), any(Values.class)); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * What happens when a profile's 'onlyif' expression is false? The message + * should NOT be applied to the profile. + */ + @Test + public void testOnlyIfFalse() throws Exception { + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // a tuple should NOT be emitted for the downstream profile builder + verify(outputCollector, times(0)) + .emit(any()); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * The entity associated with a profile is defined with a Stellar expression. That expression + * can refer to any field within the message. + * + * In this case the entity is defined as 'ip_src_addr' which is resolved to '10.0.0.1' based on + * the data contained within the message. + */ + @Test + public void testResolveEntityName() throws Exception { + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // expected values + String expectedEntity = "10.0.0.1"; + ProfileConfig expectedConfig = config.getProfiles().get(0); + Values expected = new Values(message, timestamp, expectedEntity, expectedConfig); + + // a tuple should be emitted for the downstream profile builder + verify(outputCollector, times(1)) + .emit(eq(tuple), eq(expected)); + + // the original tuple should be ack'd + verify(outputCollector, times(1)) + .ack(eq(tuple)); + } + + /** + * What happens when invalid Stella code is used for 'onlyif'? The invalid profile should be ignored. + */ + @Test + public void testOnlyIfInvalid() throws Exception { + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // a tuple should NOT be emitted for the downstream profile builder + verify(outputCollector, times(0)) + .emit(any(Values.class)); + } + + @Test + public void testWithNullMessage() throws Exception { + + // ensure the tuple returns null to mimic a null message in kafka + when(tuple.getBinary(0)).thenReturn(null); + + ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid); + ProfileSplitterBolt bolt = createBolt(config); + bolt.execute(tuple); + + // a tuple should NOT be emitted for the downstream profile builder + verify(outputCollector, times(0)) + .emit(any(Values.class)); + + } + + /** + * Creates a ProfilerConfig based on a string containing JSON. + * + * @param configAsJSON The config as JSON. + * @return The ProfilerConfig. + * @throws Exception + */ + private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception { + InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8")); + return JSONUtils.INSTANCE.load(in, ProfilerConfig.class); + } + + /** + * Create a ProfileSplitterBolt to test + */ + private ProfileSplitterBolt createBolt(ProfilerConfig config) throws Exception { + + ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL"); + bolt.setCuratorFramework(client); + bolt.setZKCache(cache); + bolt.getConfigurations().updateProfilerConfig(config); + bolt.prepare(new HashMap<>(), topologyContext, outputCollector); + + // set the clock factory AFTER calling prepare to use the fixed clock factory + DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext()); + router.setClockFactory(new FixedClockFactory(timestamp)); + bolt.setRouter(router); + + return bolt; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java new file mode 100644 index 0000000..70487a0 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.profiler.storm.integration; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.metron.integration.InMemoryComponent; +import org.apache.metron.integration.UnableToStartException; +import org.apache.metron.integration.components.ZKServerComponent; + +import java.util.Properties; + +import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper; + + +/** + * Uploads configuration to Zookeeper. + */ +public class ConfigUploadComponent implements InMemoryComponent { + + private Properties topologyProperties; + private String globalConfiguration; + private String profilerConfiguration; + + @Override + public void start() throws UnableToStartException { + try { + upload(); + } catch (Exception e) { + throw new UnableToStartException(e.getMessage(), e); + } + } + + @Override + public void stop() { + // nothing to do + } + + public void update() + throws UnableToStartException { + try { + upload(); + } catch (Exception e) { + throw new UnableToStartException(e.getMessage(), e); + } + } + + /** + * Uploads configuration to Zookeeper. + * @throws Exception + */ + private void upload() throws Exception { + final String zookeeperUrl = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY); + try(CuratorFramework client = getClient(zookeeperUrl)) { + if(client.getState() != CuratorFrameworkState.STARTED) { + client.start(); + } + uploadGlobalConfig(client); + uploadProfilerConfig(client); + } + } + + /** + * Upload the profiler configuration to Zookeeper. + * @param client The zookeeper client. + */ + private void uploadProfilerConfig(CuratorFramework client) throws Exception { + if (profilerConfiguration != null) { + byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration); + if (globalConfig.length > 0) { + writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client); + } + } + } + + /** + * Upload the global configuration to Zookeeper. + * @param client The zookeeper client. + */ + private void uploadGlobalConfig(CuratorFramework client) throws Exception { + if (globalConfiguration != null) { + byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration); + if (globalConfig.length > 0) { + writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), client); + } + } + } + + public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) { + this.topologyProperties = topologyProperties; + return this; + } + + public ConfigUploadComponent withGlobalConfiguration(String path) { + this.globalConfiguration = path; + return this; + } + + public ConfigUploadComponent withProfilerConfiguration(String path) { + this.profilerConfiguration = path; + return this; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java new file mode 100644 index 0000000..17e36e1 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/MessageBuilder.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm.integration; + +import org.json.simple.JSONObject; + +import java.util.HashMap; +import java.util.Map; + +/** + * Enables simple creation of telemetry messages for testing. + */ +public class MessageBuilder { + + private Map<Object, Object> fields; + + /** + * Create a new {@link MessageBuilder}. + */ + public MessageBuilder() { + this.fields = new HashMap<>(); + } + + /** + * Adds all of the fields from a message to this message. + * + * @param prototype The other message that is treated as a prototype. + * @return A {@link MessageBuilder} + */ + public MessageBuilder withFields(JSONObject prototype) { + prototype.forEach((key, val) -> this.fields.put(key, val)); + return this; + } + + /** + * Adds a field to the message. + * + * @param key The field name. + * @param value The field value. + * @return A {@link MessageBuilder} + */ + public MessageBuilder withField(String key, Object value) { + this.fields.put(key, value); + return this; + } + + /** + * Build the message. + * + * <p>This should be called after defining all of the message fields. + * + * @return A {@link MessageBuilder}. + */ + public JSONObject build() { + return new JSONObject(fields); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java new file mode 100644 index 0000000..182600a --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java @@ -0,0 +1,421 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.metron.profiler.storm.integration; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.SerDeUtils; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.integration.BaseIntegrationTest; +import org.apache.metron.integration.ComponentRunner; +import org.apache.metron.integration.UnableToStartException; +import org.apache.metron.integration.components.FluxTopologyComponent; +import org.apache.metron.integration.components.KafkaComponent; +import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.metron.profiler.ProfileMeasurement; +import org.apache.metron.profiler.hbase.ColumnBuilder; +import org.apache.metron.profiler.hbase.RowKeyBuilder; +import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; +import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; +import org.apache.storm.Config; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static com.google.code.tempusfugit.temporal.Duration.seconds; +import static com.google.code.tempusfugit.temporal.Timeout.timeout; +import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * An integration test of the Profiler topology. + */ +public class ProfilerIntegrationTest extends BaseIntegrationTest { + + private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler-storm/src/test"; + private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml"; + + public static final long startAt = 10; + public static final String entity = "10.0.0.1"; + + private static final String tableName = "profiler"; + private static final String columnFamily = "P"; + private static final String inputTopic = Constants.INDEXING_TOPIC; + private static final String outputTopic = "profiles"; + private static final int saltDivisor = 10; + + private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1); + private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5); + private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10); + private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15); + private static final long maxRoutesPerBolt = 100000; + + private static ColumnBuilder columnBuilder; + private static ZKServerComponent zkComponent; + private static FluxTopologyComponent fluxComponent; + private static KafkaComponent kafkaComponent; + private static ConfigUploadComponent configUploadComponent; + private static ComponentRunner runner; + private static MockHTable profilerTable; + + private static String message1; + private static String message2; + private static String message3; + + /** + * [ + * org.apache.metron.profiler.ProfileMeasurement, + * org.apache.metron.profiler.ProfilePeriod, + * org.apache.metron.common.configuration.profiler.ProfileResult, + * org.apache.metron.common.configuration.profiler.ProfileResultExpressions, + * org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, + * org.apache.metron.common.configuration.profiler.ProfilerConfig, + * org.apache.metron.common.configuration.profiler.ProfileConfig, + * org.json.simple.JSONObject, + * java.util.LinkedHashMap, + * org.apache.metron.statistics.OnlineStatisticsProvider + * ] + */ + @Multiline + private static String kryoSerializers; + + /** + * The Profiler can generate profiles based on processing time. With processing time, + * the Profiler builds profiles based on when the telemetry was processed. + * + * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler + * to use processing time. + */ + @Test + public void testProcessingTime() throws Exception { + + // upload the config to zookeeper + uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + + // the messages that will be applied to the profile + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); + + // storm needs at least one message to close its event window + int attempt = 0; + while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { + + // sleep, at least beyond the current window + Thread.sleep(windowDurationMillis + windowLagMillis); + + // send another message to help close the current event window + kafkaComponent.writeMessages(inputTopic, message2); + } + + // validate what was flushed + List<Integer> actuals = read( + profilerTable.getPutLog(), + columnFamily, + columnBuilder.getColumnQualifier("value"), + Integer.class); + assertEquals(1, actuals.size()); + assertTrue(actuals.get(0) >= 3); + } + + /** + * The Profiler can generate profiles using event time. With event time processing, + * the Profiler uses timestamps contained in the source telemetry. + * + * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler + * from which field the timestamp should be extracted. + */ + @Test + public void testEventTime() throws Exception { + + // upload the profiler config to zookeeper + uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); + + // wait until the profile is flushed + waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); + + List<Put> puts = profilerTable.getPutLog(); + assertEquals(1, puts.size()); + + // inspect the row key to ensure the profiler used event time correctly. the timestamp + // embedded in the row key should match those in the source telemetry + byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); + byte[] actualRowKey = puts.get(0).getRow(); + assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); + } + + /** + * The result produced by a Profile has to be serializable within Storm. If the result is not + * serializable the topology will crash and burn. + * + * This test ensures that if a profile returns a STATS object created using the STATS_INIT and + * STATS_ADD functions, that it can be correctly serialized and persisted. + */ + @Test + public void testProfileWithStatsObject() throws Exception { + + // upload the profiler config to zookeeper + uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); + + // start the topology and write test messages to kafka + fluxComponent.submitTopology(); + kafkaComponent.writeMessages(inputTopic, message1); + kafkaComponent.writeMessages(inputTopic, message2); + kafkaComponent.writeMessages(inputTopic, message3); + + // wait until the profile is flushed + waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); + + // ensure that a value was persisted in HBase + List<Put> puts = profilerTable.getPutLog(); + assertEquals(1, puts.size()); + + // generate the expected row key. only the profile name, entity, and period are used to generate the row key + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName("profile-with-stats") + .withEntity("global") + .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS); + RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); + byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement); + + // ensure the correct row key was generated + byte[] actualRowKey = puts.get(0).getRow(); + assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); + } + + /** + * Generates an error message for if the byte comparison fails. + * + * @param expected The expected value. + * @param actual The actual value. + * @return + * @throws UnsupportedEncodingException + */ + private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException { + return String.format("expected '%s', got '%s'", + new String(expected, "UTF-8"), + new String(actual, "UTF-8")); + } + + /** + * Generates the expected row key. + * + * @param profileName The name of the profile. + * @param entity The entity. + * @param whenMillis A timestamp in epoch milliseconds. + * @return A row key. + */ + private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) { + + // only the profile name, entity, and period are used to generate the row key + ProfileMeasurement measurement = new ProfileMeasurement() + .withProfileName(profileName) + .withEntity(entity) + .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS); + + // build the row key + RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); + return rowKeyBuilder.rowKey(measurement); + } + + /** + * Reads a value written by the Profiler. + * + * @param family The column family. + * @param qualifier The column qualifier. + * @param clazz The expected type of the value. + * @param <T> The expected type of the value. + * @return The value written by the Profiler. + */ + private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, Class<T> clazz) { + List<T> results = new ArrayList<>(); + + for(Put put: puts) { + List<Cell> cells = put.get(Bytes.toBytes(family), qualifier); + for(Cell cell : cells) { + T value = SerDeUtils.fromBytes(cell.getValue(), clazz); + results.add(value); + } + } + + return results; + } + + @BeforeClass + public static void setupBeforeClass() throws UnableToStartException { + + // create some messages that contain a timestamp - a really old timestamp; close to 1970 + message1 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt) + .build() + .toJSONString(); + + message2 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt + 100) + .build() + .toJSONString(); + + message3 = new MessageBuilder() + .withField("ip_src_addr", entity) + .withField("timestamp", startAt + (windowDurationMillis * 2)) + .build() + .toJSONString(); + + columnBuilder = new ValueOnlyColumnBuilder(columnFamily); + + // storm topology properties + final Properties topologyProperties = new Properties() {{ + + // storm settings + setProperty("profiler.workers", "1"); + setProperty("profiler.executors", "0"); + setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]"); + setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60"); + setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000"); + + // ensure tuples are serialized during the test, otherwise serialization problems + // will not be found until the topology is run on a cluster with multiple workers + setProperty(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, "true"); + setProperty(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, "false"); + setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers); + + // kafka settings + setProperty("profiler.input.topic", inputTopic); + setProperty("profiler.output.topic", outputTopic); + setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); + setProperty("kafka.security.protocol", "PLAINTEXT"); + + // hbase settings + setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor)); + setProperty("profiler.hbase.table", tableName); + setProperty("profiler.hbase.column.family", columnFamily); + setProperty("profiler.hbase.batch", "10"); + setProperty("profiler.hbase.flush.interval.seconds", "1"); + setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName()); + + // profile settings + setProperty("profiler.period.duration", Long.toString(periodDurationMillis)); + setProperty("profiler.period.duration.units", "MILLISECONDS"); + setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis)); + setProperty("profiler.ttl.units", "MILLISECONDS"); + setProperty("profiler.window.duration", Long.toString(windowDurationMillis)); + setProperty("profiler.window.duration.units", "MILLISECONDS"); + setProperty("profiler.window.lag", Long.toString(windowLagMillis)); + setProperty("profiler.window.lag.units", "MILLISECONDS"); + setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt)); + }}; + + // create the mock table + profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); + + zkComponent = getZKServerComponent(topologyProperties); + + // create the input and output topics + kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList( + new KafkaComponent.Topic(inputTopic, 1), + new KafkaComponent.Topic(outputTopic, 1))); + + // upload profiler configuration to zookeeper + configUploadComponent = new ConfigUploadComponent() + .withTopologyProperties(topologyProperties); + + // load flux definition for the profiler topology + fluxComponent = new FluxTopologyComponent.Builder() + .withTopologyLocation(new File(FLUX_PATH)) + .withTopologyName("profiler") + .withTopologyProperties(topologyProperties) + .build(); + + // start all components + runner = new ComponentRunner.Builder() + .withComponent("zk",zkComponent) + .withComponent("kafka", kafkaComponent) + .withComponent("config", configUploadComponent) + .withComponent("storm", fluxComponent) + .withMillisecondsBetweenAttempts(15000) + .withNumRetries(10) + .withCustomShutdownOrder(new String[] {"storm","config","kafka","zk"}) + .build(); + runner.start(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + MockHBaseTableProvider.clear(); + if (runner != null) { + runner.stop(); + } + } + + @Before + public void setup() { + // create the mock table + profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); + } + + @After + public void tearDown() throws Exception { + MockHBaseTableProvider.clear(); + profilerTable.clear(); + if (runner != null) { + runner.reset(); + } + } + + /** + * Uploads config values to Zookeeper. + * @param path The path on the local filesystem to the config values. + * @throws Exception + */ + public void uploadConfig(String path) throws Exception { + configUploadComponent + .withGlobalConfiguration(path) + .withProfilerConfiguration(path) + .update(); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties b/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties new file mode 100644 index 0000000..541f368 --- /dev/null +++ b/metron-analytics/metron-profiler-storm/src/test/resources/log4j.properties @@ -0,0 +1,34 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +# Root logger option +log4j.rootLogger=ERROR, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n +log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter +log4j.appender.stdout.filter.1.StringToMatch=Connection timed out +log4j.appender.stdout.filter.1.AcceptOnMatch=false +log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter +log4j.appender.stdout.filter.2.StringToMatch=Background +log4j.appender.stdout.filter.2.AcceptOnMatch=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/.gitignore ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/.gitignore b/metron-analytics/metron-profiler/.gitignore deleted file mode 100644 index df1a13b..0000000 --- a/metron-analytics/metron-profiler/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/logs \ No newline at end of file