Repository: metron Updated Branches: refs/heads/feature/METRON-1699-create-batch-profiler 9455c4ee3 -> 3d84ea429
http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh b/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh deleted file mode 100644 index 6ec78f5..0000000 --- a/metron-analytics/metron-profiler/src/main/scripts/start_profiler_topology.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -METRON_VERSION=${project.version} -METRON_HOME=/usr/metron/$METRON_VERSION -TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar -storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/profiler/remote.yaml --filter $METRON_HOME/config/profiler.properties http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json deleted file mode 100644 index 9d727a3..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/event-time-test/profiler.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": [ - { - "profile": "event-time-test", - "foreach": "ip_src_addr", - "init": { "counter": "0" }, - "update": { "counter": "counter + 1" }, - "result": "counter" - } - ], - "timestampField": "timestamp" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json deleted file mode 100644 index e75ec0f..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/processing-time-test/profiler.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "profiles": [ - { - "profile": "processing-time-test", - "foreach": "ip_src_addr", - "init": { "counter": "0" }, - "update": { "counter": "counter + 1" }, - "result": "counter" - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json deleted file mode 100644 index 083e73f..0000000 --- a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "profiles": [ - { - "profile": "profile-with-stats", - "foreach": "'global'", - "init": { "stats": "STATS_INIT()" }, - "update": { "stats": "STATS_ADD(stats, 1)" }, - "result": "stats" - } - ], - "timestampField": "timestamp" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java deleted file mode 100644 index b8949c5..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import org.junit.Test; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -/** - * Tests the {@code FixedFrequencyFlushSignal} class. - */ -public class FixedFrequencyFlushSignalTest { - - @Test - public void testSignalFlush() { - - FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000); - - // not time to flush yet - assertFalse(signal.isTimeToFlush()); - - // advance time - signal.update(5000); - - // not time to flush yet - assertFalse(signal.isTimeToFlush()); - - // advance time - signal.update(7000); - - // time to flush - assertTrue(signal.isTimeToFlush()); - } - - @Test - public void testOutOfOrderTimestamps() { - FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000); - - // advance time, out-of-order - signal.update(5000); - signal.update(1000); - signal.update(7000); - signal.update(3000); - - // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even out-of-order), then it should signal a flush - assertTrue(signal.isTimeToFlush()); - } - - @Test(expected = IllegalArgumentException.class) - public void testNegativeFrequency() { - new FixedFrequencyFlushSignal(-1000); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java deleted file mode 100644 index 35ca4d9..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/HBaseEmitterTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Values; -import org.json.simple.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -/** - * Tests the HBaseEmitter class. - */ -public class HBaseEmitterTest { - - /** - * { - * "profile": "profile-one", - * "foreach": "ip_src_addr", - * "init": { "x": "0" }, - * "update": { "x": "x + 1" }, - * "result": "x" - * } - */ - @Multiline - private String profileDefinition; - - private HBaseEmitter emitter; - private ProfileConfig profile; - private OutputCollector collector; - - @Before - public void setup() throws Exception { - emitter = new HBaseEmitter(); - profile = createDefinition(profileDefinition); - collector = Mockito.mock(OutputCollector.class); - } - - /** - * The handler should emit a message containing the result of executing - * the 'result/profile' expression. - */ - @Test - public void testEmit() throws Exception { - - // create a measurement that has triage values - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withDefinition(profile) - .withProfileValue(22); - - // execute the test - emitter.emit(measurement, collector); - - // the measurement should be emitted as-is - ProfileMeasurement actual = expectMeasurement(emitter, collector); - assertEquals(measurement, actual); - } - - /** - * Verifies that the emitter does emit a {@code ProfileMeasurement}. - * - * @return The {@code ProfileMeasurement} that was emitted - */ - private ProfileMeasurement expectMeasurement(HBaseEmitter hbaseEmitter, OutputCollector collector) { - - ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(hbaseEmitter.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof ProfileMeasurement); - return (ProfileMeasurement) values.get(0); - } - - /** - * Creates a profile definition based on a string of JSON. - * @param json The string of JSON. - */ - private ProfileConfig createDefinition(String json) throws IOException { - return JSONUtils.INSTANCE.load(json, ProfileConfig.class); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java deleted file mode 100644 index 95a2d29..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java +++ /dev/null @@ -1,291 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import com.google.common.collect.ImmutableMap; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.statistics.OnlineStatisticsProvider; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Values; -import org.json.simple.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -/** - * Tests the KafkaDestinationHandler. - */ -public class KafkaEmitterTest { - - /** - * { - * "profile": "profile-one-destination", - * "foreach": "ip_src_addr", - * "init": { "x": "0" }, - * "update": { "x": "x + 1" }, - * "result": { - * "profile": "x", - * "triage": { - * "value": "x" - * } - * } - * } - */ - @Multiline - private String profileDefinitionWithTriage; - - private KafkaEmitter kafkaEmitter; - private ProfileConfig profile; - private OutputCollector collector; - - @Before - public void setup() throws Exception { - kafkaEmitter = new KafkaEmitter(); - profile = createDefinition(profileDefinitionWithTriage); - collector = Mockito.mock(OutputCollector.class); - } - - /** - * The handler should emit a message when a result/triage expression(s) has been defined. - */ - @Test - public void testEmit() throws Exception { - - // create a measurement that has triage values - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withDefinition(profile) - .withTriageValues(Collections.singletonMap("triage-key", "triage-value")); - - // execute the test - kafkaEmitter.emit(measurement, collector); - - // a message should be emitted - verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), any()); - } - - /** - * The handler should NOT emit a message when there is NO result/triage value(s). - */ - @Test - public void testDoNotEmit() throws Exception { - - // create a measurement with NO triage values - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withDefinition(profile); - - // execute the test - kafkaEmitter.emit(measurement, collector); - - // a message should NOT be emitted - verify(collector, times(0)).emit(eq(kafkaEmitter.getStreamId()), any()); - } - - /** - * Validate that the message generated for Kafka should include the triage value. - */ - @Test - public void testTriageValueInMessage() throws Exception { - - // create a measurement that has triage values - ProfileMeasurement measurement = new ProfileMeasurement() - .withDefinition(profile) - .withProfileName(profile.getProfile()) - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", "triage-value")); - - // execute the test - kafkaEmitter.emit(measurement, collector); - JSONObject actual = expectJsonObject(kafkaEmitter, collector); - - // validate the core parts of the message - assertEquals(measurement.getProfileName(), actual.get("profile")); - assertEquals(measurement.getEntity(), actual.get("entity")); - assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); - assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); - assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); - assertEquals("profiler", actual.get("source.type")); - assertNotNull(actual.get("timestamp")); - - // validate that the triage value has been added - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); - } - - /** - * Validate that the message generated for Kafka can include multiple triage values. - */ - @Test - public void testMultipleTriageValueInMessage() throws Exception { - - // multiple triage values have been defined - Map<String, Object> triageValues = ImmutableMap.of( - "x", 2, - "y", "4", - "z", 6.0); - - // create a measurement that has multiple triage values - ProfileMeasurement measurement = new ProfileMeasurement() - .withDefinition(profile) - .withProfileName(profile.getProfile()) - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(triageValues); - - // execute the test - kafkaEmitter.emit(measurement, collector); - JSONObject actual = expectJsonObject(kafkaEmitter, collector); - - // validate that ALL of the triage values have been added - assertEquals(measurement.getTriageValues().get("x"), actual.get("x")); - assertEquals(measurement.getTriageValues().get("y"), actual.get("y")); - assertEquals(measurement.getTriageValues().get("z"), actual.get("z")); - } - - /** - * Values destined for Kafka can only be serialized into text, which limits the types of values - * that can result from a triage expression. Only primitive types and Strings are allowed. - */ - @Test - public void testInvalidType() throws Exception { - - // create one invalid expression and one valid expression - Map<String, Object> triageValues = ImmutableMap.of( - "invalid", new OnlineStatisticsProvider(), - "valid", 4); - - // create the measurement with a Map as a triage value; this is not allowed - ProfileMeasurement measurement = new ProfileMeasurement() - .withDefinition(profile) - .withProfileName(profile.getProfile()) - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(triageValues); - - // execute the test - kafkaEmitter.emit(measurement, collector); - JSONObject actual = expectJsonObject(kafkaEmitter, collector); - - // validate the core parts of the message still exist - assertEquals(measurement.getProfileName(), actual.get("profile")); - assertEquals(measurement.getEntity(), actual.get("entity")); - assertEquals(measurement.getPeriod().getPeriod(), actual.get("period")); - assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start")); - assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end")); - assertEquals("profiler", actual.get("source.type")); - - // the invalid expression should be skipped and not included in the message - assertFalse(actual.containsKey("invalid")); - - // but the valid expression should still be there - assertEquals(triageValues.get("valid"), actual.get("valid")); - } - - /** - * Values destined for Kafka can only be serialized into text, which limits the types of values - * that can result from a triage expression. Only primitive types and Strings are allowed. - */ - @Test - public void testIntegerIsValidType() throws Exception { - - // create a measurement with a triage value that is an integer - ProfileMeasurement measurement = new ProfileMeasurement() - .withDefinition(profile) - .withProfileName(profile.getProfile()) - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", 123)); - - // execute the test - kafkaEmitter.emit(measurement, collector); - JSONObject actual = expectJsonObject(kafkaEmitter, collector); - - // the triage expression is valid - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); - } - - /** - * Values destined for Kafka can only be serialized into text, which limits the types of values - * that can result from a triage expression. Only primitive types and Strings are allowed. - */ - @Test - public void testStringIsValidType() throws Exception { - - // create a measurement with a triage value that is a string - ProfileMeasurement measurement = new ProfileMeasurement() - .withDefinition(profile) - .withProfileName(profile.getProfile()) - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withTriageValues(Collections.singletonMap("triage-key", "value")); - - // execute the test - kafkaEmitter.emit(measurement, collector); - JSONObject actual = expectJsonObject(kafkaEmitter, collector); - - // the triage expression is valid - assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key")); - } - - /** - * Verifies that the KafkaEmitter does emit a JSONObject. - * @return The JSONObject that was emitted - */ - private JSONObject expectJsonObject(KafkaEmitter kafkaEmitter, OutputCollector collector) { - - ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class); - verify(collector, times(1)).emit(eq(kafkaEmitter.getStreamId()), arg.capture()); - Values values = arg.getValue(); - assertTrue(values.get(0) instanceof JSONObject); - return (JSONObject) values.get(0); - } - - /** - * Creates a profile definition based on a string of JSON. - * @param json The string of JSON. - */ - private ProfileConfig createDefinition(String json) throws IOException { - return JSONUtils.INSTANCE.load(json, ProfileConfig.class); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java deleted file mode 100644 index 3132ae6..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ /dev/null @@ -1,356 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; -import org.apache.metron.profiler.MessageDistributor; -import org.apache.metron.profiler.MessageRoute; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.profiler.integration.MessageBuilder; -import org.apache.metron.test.bolt.BaseBoltTest; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseWindowedBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.apache.storm.windowing.TupleWindow; -import org.json.simple.JSONObject; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests the ProfileBuilderBolt. - */ -public class ProfileBuilderBoltTest extends BaseBoltTest { - - private JSONObject message1; - private JSONObject message2; - private ProfileConfig profile1; - private ProfileConfig profile2; - private ProfileMeasurementEmitter emitter; - private ManualFlushSignal flushSignal; - private ProfileMeasurement measurement; - - @Before - public void setup() throws Exception { - - message1 = new MessageBuilder() - .withField("ip_src_addr", "10.0.0.1") - .withField("value", "22") - .build(); - - message2 = new MessageBuilder() - .withField("ip_src_addr", "10.0.0.2") - .withField("value", "22") - .build(); - - profile1 = new ProfileConfig() - .withProfile("profile1") - .withForeach("ip_src_addr") - .withInit("x", "0") - .withUpdate("x", "x + 1") - .withResult("x"); - - profile2 = new ProfileConfig() - .withProfile("profile2") - .withForeach("ip_src_addr") - .withInit(Collections.singletonMap("x", "0")) - .withUpdate(Collections.singletonMap("x", "x + 1")) - .withResult("x"); - - measurement = new ProfileMeasurement() - .withEntity("entity1") - .withProfileName("profile1") - .withPeriod(1000, 500, TimeUnit.MILLISECONDS) - .withProfileValue(22); - - flushSignal = new ManualFlushSignal(); - flushSignal.setFlushNow(false); - } - - /** - * The bolt should extract a message and timestamp from a tuple and - * pass that to a {@code MessageDistributor}. - */ - @Test - public void testExtractMessage() throws Exception { - - ProfileBuilderBolt bolt = createBolt(); - - // create a mock - MessageDistributor distributor = mock(MessageDistributor.class); - bolt.withMessageDistributor(distributor); - - // create a tuple - final long timestamp1 = 100000000L; - Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1); - - // execute the bolt - TupleWindow tupleWindow = createWindow(tuple1); - bolt.execute(tupleWindow); - - // the message should have been extracted from the tuple and passed to the MessageDistributor - verify(distributor).distribute(any(MessageRoute.class), any()); - } - - - /** - * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor} - * and emit the {@code ProfileMeasurement} values from all active profiles. - */ - @Test - public void testFlushActiveProfiles() throws Exception { - - ProfileBuilderBolt bolt = createBolt(); - - // create a mock that returns the profile measurement above - MessageDistributor distributor = mock(MessageDistributor.class); - when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); - bolt.withMessageDistributor(distributor); - - // signal the bolt to flush - flushSignal.setFlushNow(true); - - // execute the bolt - Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); - TupleWindow tupleWindow = createWindow(tuple1); - bolt.execute(tupleWindow); - - // a profile measurement should be emitted by the bolt - List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); - assertEquals(1, measurements.size()); - assertEquals(measurement, measurements.get(0)); - } - - /** - * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted. - */ - @Test - public void testDoNotFlushActiveProfiles() throws Exception { - - ProfileBuilderBolt bolt = createBolt(); - - // create a mock where flush() returns the profile measurement above - MessageDistributor distributor = mock(MessageDistributor.class); - when(distributor.flush()).thenReturn(Collections.singletonList(measurement)); - bolt.withMessageDistributor(distributor); - - // there is no flush signal - flushSignal.setFlushNow(false); - - // execute the bolt - Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L); - TupleWindow tupleWindow = createWindow(tuple1); - bolt.execute(tupleWindow); - - // nothing should have been emitted - getProfileMeasurements(outputCollector, 0); - } - - /** - * Expired profiles should be flushed regularly, even if no input telemetry - * has been received. - */ - @Test - public void testFlushExpiredProfiles() throws Exception { - - ProfileBuilderBolt bolt = createBolt(); - - // create a mock where flushExpired() returns the profile measurement above - MessageDistributor distributor = mock(MessageDistributor.class); - when(distributor.flushExpired()).thenReturn(Collections.singletonList(measurement)); - bolt.withMessageDistributor(distributor); - - // execute test by flushing expired profiles. this is normally triggered by a timer task. - bolt.flushExpired(); - - // a profile measurement should be emitted by the bolt - List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1); - assertEquals(1, measurements.size()); - assertEquals(measurement, measurements.get(0)); - } - - /** - * A {@link ProfileMeasurement} is built for each profile/entity pair. The measurement should be emitted to each - * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations. - */ - @Test - public void testEmitters() throws Exception { - - // defines the zk configurations accessible from the bolt - ProfilerConfigurations configurations = new ProfilerConfigurations(); - configurations.updateGlobalConfig(Collections.emptyMap()); - - // create the bolt with 3 destinations - ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() - .withProfileTimeToLive(30, TimeUnit.MINUTES) - .withPeriodDuration(10, TimeUnit.MINUTES) - .withMaxNumberOfRoutes(Long.MAX_VALUE) - .withZookeeperClient(client) - .withZookeeperCache(cache) - .withEmitter(new TestEmitter("destination1")) - .withEmitter(new TestEmitter("destination2")) - .withEmitter(new TestEmitter("destination3")) - .withProfilerConfigurations(configurations) - .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES)); - bolt.prepare(new HashMap<>(), topologyContext, outputCollector); - - // signal the bolt to flush - bolt.withFlushSignal(flushSignal); - flushSignal.setFlushNow(true); - - // execute the bolt - Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis()); - TupleWindow window = createWindow(tuple1); - bolt.execute(window); - - // validate measurements emitted to each - verify(outputCollector, times(1)).emit(eq("destination1"), any()); - verify(outputCollector, times(1)).emit(eq("destination2"), any()); - verify(outputCollector, times(1)).emit(eq("destination3"), any()); - } - - /** - * Retrieves the ProfileMeasurement(s) (if any) that have been emitted. - * - * @param collector The Storm output collector. - * @param expected The number of measurements expected. - * @return A list of ProfileMeasurement(s). - */ - private List<ProfileMeasurement> getProfileMeasurements(OutputCollector collector, int expected) { - - // the 'streamId' is defined by the DestinationHandler being used by the bolt - final String streamId = emitter.getStreamId(); - - // capture the emitted tuple(s) - ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class); - verify(collector, times(expected)) - .emit(eq(streamId), argCaptor.capture()); - - // return the profile measurements that were emitted - return argCaptor.getAllValues() - .stream() - .map(val -> (ProfileMeasurement) val.get(0)) - .collect(Collectors.toList()); - } - - /** - * Create a tuple that will contain the message, the entity name, and profile definition. - * @param entity The entity name - * @param message The telemetry message. - * @param profile The profile definition. - */ - private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) { - - Tuple tuple = mock(Tuple.class); - when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message); - when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp); - when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity); - when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile); - - return tuple; - } - - /** - * Create a ProfileBuilderBolt to test. - * @return A {@link ProfileBuilderBolt} to test. - */ - private ProfileBuilderBolt createBolt() throws IOException { - - // defines the zk configurations accessible from the bolt - ProfilerConfigurations configurations = new ProfilerConfigurations(); - configurations.updateGlobalConfig(Collections.emptyMap()); - - emitter = new HBaseEmitter(); - ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt() - .withProfileTimeToLive(30, TimeUnit.MINUTES) - .withMaxNumberOfRoutes(Long.MAX_VALUE) - .withZookeeperClient(client) - .withZookeeperCache(cache) - .withEmitter(emitter) - .withProfilerConfigurations(configurations) - .withPeriodDuration(1, TimeUnit.MINUTES) - .withTumblingWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS)); - bolt.prepare(new HashMap<>(), topologyContext, outputCollector); - - // set the flush signal AFTER calling 'prepare' - bolt.withFlushSignal(flushSignal); - - return bolt; - } - - /** - * Creates a mock TupleWindow containing multiple tuples. - * @param tuples The tuples to add to the window. - */ - private TupleWindow createWindow(Tuple... tuples) { - - TupleWindow window = mock(TupleWindow.class); - when(window.get()).thenReturn(Arrays.asList(tuples)); - return window; - } - - /** - * An implementation for testing purposes only. - */ - private class TestEmitter implements ProfileMeasurementEmitter { - - private String streamId; - - public TestEmitter(String streamId) { - this.streamId = streamId; - } - - @Override - public String getStreamId() { - return streamId; - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(getStreamId(), new Fields("measurement")); - } - - @Override - public void emit(ProfileMeasurement measurement, OutputCollector collector) { - collector.emit(getStreamId(), new Values(measurement)); - } - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java deleted file mode 100644 index 04c774c..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.configuration.profiler.ProfileResult; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.profiler.hbase.RowKeyBuilder; -import org.apache.storm.tuple.Tuple; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests the ProfileHBaseMapper class. - */ -public class ProfileHBaseMapperTest { - - private Tuple tuple; - private ProfileHBaseMapper mapper; - private ProfileMeasurement measurement; - private RowKeyBuilder rowKeyBuilder; - private ProfileConfig profile; - - @Before - public void setup() { - rowKeyBuilder = mock(RowKeyBuilder.class); - - mapper = new ProfileHBaseMapper(); - mapper.setRowKeyBuilder(rowKeyBuilder); - - profile = new ProfileConfig("profile", "ip_src_addr", new ProfileResult("2 + 2")); - - measurement = new ProfileMeasurement() - .withProfileName("profile") - .withEntity("entity") - .withPeriod(20000, 15, TimeUnit.MINUTES) - .withProfileValue(22) - .withDefinition(profile); - - // the tuple will contain the original message - tuple = mock(Tuple.class); - when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement); - } - - /** - * The mapper should return the expiration for a tuple based on the Profile definition. - */ - @Test - public void testExpires() throws Exception { - final Long expiresDays = 30L; - profile.setExpires(expiresDays); - - Optional<Long> actual = mapper.getTTL(tuple); - Assert.assertTrue(actual.isPresent()); - Assert.assertEquals(expiresDays, (Long) TimeUnit.MILLISECONDS.toDays(actual.get())); - } - - /** - * The expiration field is optional within a Profile definition. - */ - @Test - public void testExpiresUndefined() throws Exception { - // the TTL should not be defined - Optional<Long> actual = mapper.getTTL(tuple); - Assert.assertFalse(actual.isPresent()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java deleted file mode 100644 index c879b4b..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java +++ /dev/null @@ -1,455 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.bolt; - -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.configuration.profiler.ProfileConfig; -import org.apache.metron.common.configuration.profiler.ProfilerConfig; -import org.apache.metron.profiler.DefaultMessageRouter; -import org.apache.metron.profiler.clock.FixedClockFactory; -import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.test.bolt.BaseBoltTest; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.Values; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.util.HashMap; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -/** - * Tests the ProfileSplitterBolt. - */ -public class ProfileSplitterBoltTest extends BaseBoltTest { - - /** - * { - * "ip_src_addr": "10.0.0.1", - * "ip_dst_addr": "10.0.0.20", - * "protocol": "HTTP", - * "timestamp.custom": 2222222222222, - * "timestamp.string": "3333333333333" - * } - */ - @Multiline - private String input; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "protocol == 'HTTP'", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ] - * } - */ - @Multiline - private String profileWithOnlyIfTrue; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "false", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ] - * } - */ - @Multiline - private String profileWithOnlyIfFalse; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ] - * } - */ - @Multiline - private String profileWithOnlyIfMissing; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "onlyif": "NOT-VALID", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ] - * } - */ - @Multiline - private String profileWithOnlyIfInvalid; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ], - * "timestampField": "timestamp.custom" - * } - */ - @Multiline - private String profileUsingCustomTimestampField; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ], - * "timestampField": "timestamp.missing" - * } - */ - @Multiline - private String profileUsingMissingTimestampField; - - /** - * { - * "profiles": [ - * { - * "profile": "test", - * "foreach": "ip_src_addr", - * "init": {}, - * "update": {}, - * "result": "2" - * } - * ], - * "timestampField": "timestamp.string" - * } - */ - @Multiline - private String profileUsingStringTimestampField; - - /** - * { - * "profiles": [ - * ] - * } - */ - @Multiline - private String noProfilesDefined; - - /** - * { - * "profiles": [ - * { - * "profile": "profile1", - * "foreach": "'global'", - * "result": "1" - * }, - * { - * "profile": "profile2", - * "foreach": "'global'", - * "result": "2" - * } - * ] - * } - */ - @Multiline - private String twoProfilesDefined; - - private JSONObject message; - private long timestamp = 3333333; - - @Before - public void setup() throws ParseException { - - // parse the input message - JSONParser parser = new JSONParser(); - message = (JSONObject) parser.parse(input); - - // ensure the tuple returns the expected json message - when(tuple.getBinary(0)).thenReturn(input.getBytes()); - } - - /** - * Ensure that a tuple with the correct fields is emitted to downstream bolts - * when a profile is defined. - */ - @Test - public void testEmitTupleWithOneProfile() throws Exception { - - // setup the bolt and execute a tuple - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // the expected tuple fields - String expectedEntity = "10.0.0.1"; - ProfileConfig expectedConfig = config.getProfiles().get(0); - Values expected = new Values(message, timestamp, expectedEntity, expectedConfig); - - // a tuple should be emitted for the downstream profile builder - verify(outputCollector, times(1)) - .emit(eq(tuple), eq(expected)); - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * If there are two profiles that need the same message, then two tuples should - * be emitted. One tuple for each profile. - */ - @Test - public void testEmitTupleWithTwoProfiles() throws Exception { - - // setup the bolt and execute a tuple - ProfilerConfig config = toProfilerConfig(twoProfilesDefined); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // the expected tuple fields - final String expectedEntity = "global"; - { - // a tuple should be emitted for the first profile - ProfileConfig profile1 = config.getProfiles().get(0); - Values expected = new Values(message, timestamp, expectedEntity, profile1); - verify(outputCollector, times(1)) - .emit(eq(tuple), eq(expected)); - } - { - // a tuple should be emitted for the second profile - ProfileConfig profile2 = config.getProfiles().get(1); - Values expected = new Values(message, timestamp, expectedEntity, profile2); - verify(outputCollector, times(1)) - .emit(eq(tuple), eq(expected)); - } - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * No tuples should be emitted, if no profiles are defined. - */ - @Test - public void testNoProfilesDefined() throws Exception { - - // setup the bolt and execute a tuple - ProfilerConfig config = toProfilerConfig(noProfilesDefined); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // no tuple should be emitted - verify(outputCollector, times(0)) - .emit(any(Tuple.class), any()); - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * What happens when a profile's 'onlyif' expression is true? The message - * should be applied to the profile. - */ - @Test - public void testOnlyIfTrue() throws Exception { - - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // a tuple should be emitted for the downstream profile builder - verify(outputCollector, times(1)) - .emit(eq(tuple), any(Values.class)); - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * All messages are applied to a profile where 'onlyif' is missing. A profile with no - * 'onlyif' is treated the same as if 'onlyif=true'. - */ - @Test - public void testOnlyIfMissing() throws Exception { - - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // a tuple should be emitted for the downstream profile builder - verify(outputCollector, times(1)) - .emit(eq(tuple), any(Values.class)); - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * What happens when a profile's 'onlyif' expression is false? The message - * should NOT be applied to the profile. - */ - @Test - public void testOnlyIfFalse() throws Exception { - - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // a tuple should NOT be emitted for the downstream profile builder - verify(outputCollector, times(0)) - .emit(any()); - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * The entity associated with a profile is defined with a Stellar expression. That expression - * can refer to any field within the message. - * - * In this case the entity is defined as 'ip_src_addr' which is resolved to '10.0.0.1' based on - * the data contained within the message. - */ - @Test - public void testResolveEntityName() throws Exception { - - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // expected values - String expectedEntity = "10.0.0.1"; - ProfileConfig expectedConfig = config.getProfiles().get(0); - Values expected = new Values(message, timestamp, expectedEntity, expectedConfig); - - // a tuple should be emitted for the downstream profile builder - verify(outputCollector, times(1)) - .emit(eq(tuple), eq(expected)); - - // the original tuple should be ack'd - verify(outputCollector, times(1)) - .ack(eq(tuple)); - } - - /** - * What happens when invalid Stella code is used for 'onlyif'? The invalid profile should be ignored. - */ - @Test - public void testOnlyIfInvalid() throws Exception { - - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // a tuple should NOT be emitted for the downstream profile builder - verify(outputCollector, times(0)) - .emit(any(Values.class)); - } - - @Test - public void testWithNullMessage() throws Exception { - - // ensure the tuple returns null to mimic a null message in kafka - when(tuple.getBinary(0)).thenReturn(null); - - ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid); - ProfileSplitterBolt bolt = createBolt(config); - bolt.execute(tuple); - - // a tuple should NOT be emitted for the downstream profile builder - verify(outputCollector, times(0)) - .emit(any(Values.class)); - - } - - /** - * Creates a ProfilerConfig based on a string containing JSON. - * - * @param configAsJSON The config as JSON. - * @return The ProfilerConfig. - * @throws Exception - */ - private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception { - InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8")); - return JSONUtils.INSTANCE.load(in, ProfilerConfig.class); - } - - /** - * Create a ProfileSplitterBolt to test - */ - private ProfileSplitterBolt createBolt(ProfilerConfig config) throws Exception { - - ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL"); - bolt.setCuratorFramework(client); - bolt.setZKCache(cache); - bolt.getConfigurations().updateProfilerConfig(config); - bolt.prepare(new HashMap<>(), topologyContext, outputCollector); - - // set the clock factory AFTER calling prepare to use the fixed clock factory - DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext()); - router.setClockFactory(new FixedClockFactory(timestamp)); - bolt.setRouter(router); - - return bolt; - } - -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java deleted file mode 100644 index b59d0b5..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ConfigUploadComponent.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.metron.profiler.integration; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; -import org.apache.metron.integration.InMemoryComponent; -import org.apache.metron.integration.UnableToStartException; -import org.apache.metron.integration.components.ZKServerComponent; - -import java.util.Properties; - -import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient; -import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromFile; -import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper; -import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromFile; -import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper; - - -/** - * Uploads configuration to Zookeeper. - */ -public class ConfigUploadComponent implements InMemoryComponent { - - private Properties topologyProperties; - private String globalConfiguration; - private String profilerConfiguration; - - @Override - public void start() throws UnableToStartException { - try { - upload(); - } catch (Exception e) { - throw new UnableToStartException(e.getMessage(), e); - } - } - - @Override - public void stop() { - // nothing to do - } - - public void update() - throws UnableToStartException { - try { - upload(); - } catch (Exception e) { - throw new UnableToStartException(e.getMessage(), e); - } - } - - /** - * Uploads configuration to Zookeeper. - * @throws Exception - */ - private void upload() throws Exception { - final String zookeeperUrl = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY); - try(CuratorFramework client = getClient(zookeeperUrl)) { - if(client.getState() != CuratorFrameworkState.STARTED) { - client.start(); - } - uploadGlobalConfig(client); - uploadProfilerConfig(client); - } - } - - /** - * Upload the profiler configuration to Zookeeper. - * @param client The zookeeper client. - */ - private void uploadProfilerConfig(CuratorFramework client) throws Exception { - if (profilerConfiguration != null) { - byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration); - if (globalConfig.length > 0) { - writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client); - } - } - } - - /** - * Upload the global configuration to Zookeeper. - * @param client The zookeeper client. - */ - private void uploadGlobalConfig(CuratorFramework client) throws Exception { - if (globalConfiguration != null) { - byte[] globalConfig = readGlobalConfigFromFile(globalConfiguration); - if (globalConfig.length > 0) { - writeGlobalConfigToZookeeper(readGlobalConfigFromFile(globalConfiguration), client); - } - } - } - - public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) { - this.topologyProperties = topologyProperties; - return this; - } - - public ConfigUploadComponent withGlobalConfiguration(String path) { - this.globalConfiguration = path; - return this; - } - - public ConfigUploadComponent withProfilerConfiguration(String path) { - this.profilerConfiguration = path; - return this; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java deleted file mode 100644 index 7e1628b..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.integration; - -import org.json.simple.JSONObject; - -import java.util.HashMap; -import java.util.Map; - -/** - * Enables simple creation of telemetry messages for testing. - */ -public class MessageBuilder { - - private Map<Object, Object> fields; - - /** - * Create a new {@link MessageBuilder}. - */ - public MessageBuilder() { - this.fields = new HashMap<>(); - } - - /** - * Adds all of the fields from a message to this message. - * - * @param prototype The other message that is treated as a prototype. - * @return A {@link MessageBuilder} - */ - public MessageBuilder withFields(JSONObject prototype) { - prototype.forEach((key, val) -> this.fields.put(key, val)); - return this; - } - - /** - * Adds a field to the message. - * - * @param key The field name. - * @param value The field value. - * @return A {@link MessageBuilder} - */ - public MessageBuilder withField(String key, Object value) { - this.fields.put(key, value); - return this; - } - - /** - * Build the message. - * - * <p>This should be called after defining all of the message fields. - * - * @return A {@link MessageBuilder}. - */ - public JSONObject build() { - return new JSONObject(fields); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java deleted file mode 100644 index 268ce26..0000000 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.metron.profiler.integration; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.FieldSerializer; -import org.adrianwalker.multilinestring.Multiline; -import org.apache.commons.io.output.ByteArrayOutputStream; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.metron.common.Constants; -import org.apache.metron.common.utils.SerDeUtils; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.apache.metron.hbase.mock.MockHTable; -import org.apache.metron.integration.BaseIntegrationTest; -import org.apache.metron.integration.ComponentRunner; -import org.apache.metron.integration.UnableToStartException; -import org.apache.metron.integration.components.FluxTopologyComponent; -import org.apache.metron.integration.components.KafkaComponent; -import org.apache.metron.integration.components.ZKServerComponent; -import org.apache.metron.profiler.ProfileMeasurement; -import org.apache.metron.profiler.hbase.ColumnBuilder; -import org.apache.metron.profiler.hbase.RowKeyBuilder; -import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder; -import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder; -import org.apache.storm.Config; -import org.apache.storm.serialization.KryoTupleDeserializer; -import org.apache.storm.serialization.KryoTupleSerializer; -import org.apache.storm.serialization.KryoValuesDeserializer; -import org.apache.storm.serialization.KryoValuesSerializer; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.tuple.Tuple; -import org.apache.storm.tuple.TupleImpl; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import static com.google.code.tempusfugit.temporal.Duration.seconds; -import static com.google.code.tempusfugit.temporal.Timeout.timeout; -import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -/** - * An integration test of the Profiler topology. - */ -public class ProfilerIntegrationTest extends BaseIntegrationTest { - - private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler/src/test"; - private static final String FLUX_PATH = "../metron-profiler/src/main/flux/profiler/remote.yaml"; - - public static final long startAt = 10; - public static final String entity = "10.0.0.1"; - - private static final String tableName = "profiler"; - private static final String columnFamily = "P"; - private static final String inputTopic = Constants.INDEXING_TOPIC; - private static final String outputTopic = "profiles"; - private static final int saltDivisor = 10; - - private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1); - private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5); - private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10); - private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15); - private static final long maxRoutesPerBolt = 100000; - - private static ColumnBuilder columnBuilder; - private static ZKServerComponent zkComponent; - private static FluxTopologyComponent fluxComponent; - private static KafkaComponent kafkaComponent; - private static ConfigUploadComponent configUploadComponent; - private static ComponentRunner runner; - private static MockHTable profilerTable; - - private static String message1; - private static String message2; - private static String message3; - - /** - * [ - * org.apache.metron.profiler.ProfileMeasurement, - * org.apache.metron.profiler.ProfilePeriod, - * org.apache.metron.common.configuration.profiler.ProfileResult, - * org.apache.metron.common.configuration.profiler.ProfileResultExpressions, - * org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, - * org.apache.metron.common.configuration.profiler.ProfilerConfig, - * org.apache.metron.common.configuration.profiler.ProfileConfig, - * org.json.simple.JSONObject, - * java.util.LinkedHashMap, - * org.apache.metron.statistics.OnlineStatisticsProvider - * ] - */ - @Multiline - private static String kryoSerializers; - - /** - * The Profiler can generate profiles based on processing time. With processing time, - * the Profiler builds profiles based on when the telemetry was processed. - * - * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler - * to use processing time. - */ - @Test - public void testProcessingTime() throws Exception { - - // upload the config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/processing-time-test"); - - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - - // the messages that will be applied to the profile - kafkaComponent.writeMessages(inputTopic, message1); - kafkaComponent.writeMessages(inputTopic, message2); - kafkaComponent.writeMessages(inputTopic, message3); - - // storm needs at least one message to close its event window - int attempt = 0; - while(profilerTable.getPutLog().size() == 0 && attempt++ < 10) { - - // sleep, at least beyond the current window - Thread.sleep(windowDurationMillis + windowLagMillis); - - // send another message to help close the current event window - kafkaComponent.writeMessages(inputTopic, message2); - } - - // validate what was flushed - List<Integer> actuals = read( - profilerTable.getPutLog(), - columnFamily, - columnBuilder.getColumnQualifier("value"), - Integer.class); - assertEquals(1, actuals.size()); - assertTrue(actuals.get(0) >= 3); - } - - /** - * The Profiler can generate profiles using event time. With event time processing, - * the Profiler uses timestamps contained in the source telemetry. - * - * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler - * from which field the timestamp should be extracted. - */ - @Test - public void testEventTime() throws Exception { - - // upload the profiler config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test"); - - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1); - kafkaComponent.writeMessages(inputTopic, message2); - kafkaComponent.writeMessages(inputTopic, message3); - - // wait until the profile is flushed - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - - List<Put> puts = profilerTable.getPutLog(); - assertEquals(1, puts.size()); - - // inspect the row key to ensure the profiler used event time correctly. the timestamp - // embedded in the row key should match those in the source telemetry - byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt); - byte[] actualRowKey = puts.get(0).getRow(); - assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); - } - - /** - * The result produced by a Profile has to be serializable within Storm. If the result is not - * serializable the topology will crash and burn. - * - * This test ensures that if a profile returns a STATS object created using the STATS_INIT and - * STATS_ADD functions, that it can be correctly serialized and persisted. - */ - @Test - public void testProfileWithStatsObject() throws Exception { - - // upload the profiler config to zookeeper - uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats"); - - // start the topology and write test messages to kafka - fluxComponent.submitTopology(); - kafkaComponent.writeMessages(inputTopic, message1); - kafkaComponent.writeMessages(inputTopic, message2); - kafkaComponent.writeMessages(inputTopic, message3); - - // wait until the profile is flushed - waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90))); - - // ensure that a value was persisted in HBase - List<Put> puts = profilerTable.getPutLog(); - assertEquals(1, puts.size()); - - // generate the expected row key. only the profile name, entity, and period are used to generate the row key - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName("profile-with-stats") - .withEntity("global") - .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS); - RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); - byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement); - - // ensure the correct row key was generated - byte[] actualRowKey = puts.get(0).getRow(); - assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey); - } - - /** - * Generates an error message for if the byte comparison fails. - * - * @param expected The expected value. - * @param actual The actual value. - * @return - * @throws UnsupportedEncodingException - */ - private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException { - return String.format("expected '%s', got '%s'", - new String(expected, "UTF-8"), - new String(actual, "UTF-8")); - } - - /** - * Generates the expected row key. - * - * @param profileName The name of the profile. - * @param entity The entity. - * @param whenMillis A timestamp in epoch milliseconds. - * @return A row key. - */ - private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) { - - // only the profile name, entity, and period are used to generate the row key - ProfileMeasurement measurement = new ProfileMeasurement() - .withProfileName(profileName) - .withEntity(entity) - .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS); - - // build the row key - RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS); - return rowKeyBuilder.rowKey(measurement); - } - - /** - * Reads a value written by the Profiler. - * - * @param family The column family. - * @param qualifier The column qualifier. - * @param clazz The expected type of the value. - * @param <T> The expected type of the value. - * @return The value written by the Profiler. - */ - private <T> List<T> read(List<Put> puts, String family, byte[] qualifier, Class<T> clazz) { - List<T> results = new ArrayList<>(); - - for(Put put: puts) { - List<Cell> cells = put.get(Bytes.toBytes(family), qualifier); - for(Cell cell : cells) { - T value = SerDeUtils.fromBytes(cell.getValue(), clazz); - results.add(value); - } - } - - return results; - } - - @BeforeClass - public static void setupBeforeClass() throws UnableToStartException { - - // create some messages that contain a timestamp - a really old timestamp; close to 1970 - message1 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt) - .build() - .toJSONString(); - - message2 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + 100) - .build() - .toJSONString(); - - message3 = new MessageBuilder() - .withField("ip_src_addr", entity) - .withField("timestamp", startAt + (windowDurationMillis * 2)) - .build() - .toJSONString(); - - columnBuilder = new ValueOnlyColumnBuilder(columnFamily); - - // storm topology properties - final Properties topologyProperties = new Properties() {{ - - // storm settings - setProperty("profiler.workers", "1"); - setProperty("profiler.executors", "0"); - setProperty(Config.TOPOLOGY_AUTO_CREDENTIALS, "[]"); - setProperty(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, "60"); - setProperty(Config.TOPOLOGY_MAX_SPOUT_PENDING, "100000"); - - // ensure tuples are serialized during the test, otherwise serialization problems - // will not be found until the topology is run on a cluster with multiple workers - setProperty(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, "true"); - setProperty(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, "false"); - setProperty(Config.TOPOLOGY_KRYO_REGISTER, kryoSerializers); - - // kafka settings - setProperty("profiler.input.topic", inputTopic); - setProperty("profiler.output.topic", outputTopic); - setProperty("kafka.start", "UNCOMMITTED_EARLIEST"); - setProperty("kafka.security.protocol", "PLAINTEXT"); - - // hbase settings - setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor)); - setProperty("profiler.hbase.table", tableName); - setProperty("profiler.hbase.column.family", columnFamily); - setProperty("profiler.hbase.batch", "10"); - setProperty("profiler.hbase.flush.interval.seconds", "1"); - setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName()); - - // profile settings - setProperty("profiler.period.duration", Long.toString(periodDurationMillis)); - setProperty("profiler.period.duration.units", "MILLISECONDS"); - setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis)); - setProperty("profiler.ttl.units", "MILLISECONDS"); - setProperty("profiler.window.duration", Long.toString(windowDurationMillis)); - setProperty("profiler.window.duration.units", "MILLISECONDS"); - setProperty("profiler.window.lag", Long.toString(windowLagMillis)); - setProperty("profiler.window.lag.units", "MILLISECONDS"); - setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt)); - }}; - - // create the mock table - profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); - - zkComponent = getZKServerComponent(topologyProperties); - - // create the input and output topics - kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList( - new KafkaComponent.Topic(inputTopic, 1), - new KafkaComponent.Topic(outputTopic, 1))); - - // upload profiler configuration to zookeeper - configUploadComponent = new ConfigUploadComponent() - .withTopologyProperties(topologyProperties); - - // load flux definition for the profiler topology - fluxComponent = new FluxTopologyComponent.Builder() - .withTopologyLocation(new File(FLUX_PATH)) - .withTopologyName("profiler") - .withTopologyProperties(topologyProperties) - .build(); - - // start all components - runner = new ComponentRunner.Builder() - .withComponent("zk",zkComponent) - .withComponent("kafka", kafkaComponent) - .withComponent("config", configUploadComponent) - .withComponent("storm", fluxComponent) - .withMillisecondsBetweenAttempts(15000) - .withNumRetries(10) - .withCustomShutdownOrder(new String[] {"storm","config","kafka","zk"}) - .build(); - runner.start(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - MockHBaseTableProvider.clear(); - if (runner != null) { - runner.stop(); - } - } - - @Before - public void setup() { - // create the mock table - profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily); - } - - @After - public void tearDown() throws Exception { - MockHBaseTableProvider.clear(); - profilerTable.clear(); - if (runner != null) { - runner.reset(); - } - } - - /** - * Uploads config values to Zookeeper. - * @param path The path on the local filesystem to the config values. - * @throws Exception - */ - public void uploadConfig(String path) throws Exception { - configUploadComponent - .withGlobalConfiguration(path) - .withProfilerConfiguration(path) - .update(); - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/metron-profiler/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/resources/log4j.properties b/metron-analytics/metron-profiler/src/test/resources/log4j.properties deleted file mode 100644 index 541f368..0000000 --- a/metron-analytics/metron-profiler/src/test/resources/log4j.properties +++ /dev/null @@ -1,34 +0,0 @@ -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# - -# Root logger option -log4j.rootLogger=ERROR, stdout - -# Direct log messages to stdout -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.Target=System.out -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n -log4j.appender.stdout.filter.1=org.apache.log4j.varia.StringMatchFilter -log4j.appender.stdout.filter.1.StringToMatch=Connection timed out -log4j.appender.stdout.filter.1.AcceptOnMatch=false -log4j.appender.stdout.filter.2=org.apache.log4j.varia.StringMatchFilter -log4j.appender.stdout.filter.2.StringToMatch=Background -log4j.appender.stdout.filter.2.AcceptOnMatch=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-analytics/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/pom.xml b/metron-analytics/pom.xml index 37ee2b0..bcfc6e0 100644 --- a/metron-analytics/pom.xml +++ b/metron-analytics/pom.xml @@ -43,7 +43,7 @@ <module>metron-maas-service</module> <module>metron-maas-common</module> <module>metron-statistics</module> - <module>metron-profiler</module> + <module>metron-profiler-storm</module> <module>metron-profiler-client</module> <module>metron-profiler-common</module> <module>metron-profiler-spark</module> http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-deployment/packaging/docker/deb-docker/pom.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/docker/deb-docker/pom.xml b/metron-deployment/packaging/docker/deb-docker/pom.xml index a0df09a..92d63cc 100644 --- a/metron-deployment/packaging/docker/deb-docker/pom.xml +++ b/metron-deployment/packaging/docker/deb-docker/pom.xml @@ -126,7 +126,7 @@ </includes> </resource> <resource> - <directory>${metron_dir}/metron-analytics/metron-profiler/target/</directory> + <directory>${metron_dir}/metron-analytics/metron-profiler-storm/target/</directory> <includes> <include>*.tar.gz</include> </includes> http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-deployment/packaging/docker/rpm-docker/pom.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/docker/rpm-docker/pom.xml b/metron-deployment/packaging/docker/rpm-docker/pom.xml index 0d9c4d1..2220ca8 100644 --- a/metron-deployment/packaging/docker/rpm-docker/pom.xml +++ b/metron-deployment/packaging/docker/rpm-docker/pom.xml @@ -162,7 +162,7 @@ </includes> </resource> <resource> - <directory>${metron_dir}/metron-analytics/metron-profiler/target/</directory> + <directory>${metron_dir}/metron-analytics/metron-profiler-storm/target/</directory> <includes> <include>*.tar.gz</include> </includes> http://git-wip-us.apache.org/repos/asf/metron/blob/3d84ea42/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java index 5240d7a..a7dc248 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java @@ -119,7 +119,7 @@ public class ZKConfigurationsCacheIntegrationTest { @Multiline public static String globalConfig; - public static File profilerDir = new File("../../metron-analytics/metron-profiler/src/test/config/zookeeper"); + public static File profilerDir = new File("../../metron-analytics/metron-profiler-storm/src/test/config/zookeeper"); public ConfigurationsCache cache; public ZKServerComponent zkComponent;
