http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java deleted file mode 100644 index 5bf0410..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Random; - -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.junit.Assert; -import org.junit.Test; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -public class DedupCacheTest { - - @Test - public void testNormal() throws Exception { - Config config = ConfigFactory.load(); - DedupCache dedupCache = new DedupCache(config, "testPublishment"); - - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy"); - - String[] states = new String[] {"OPEN", "WARN", "CLOSE"}; - Random random = new Random(); - for (int i = 0; i < 20; i++) { - AlertStreamEvent event = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", states[random.nextInt(3)], 0, 0 - }); - HashMap<String, String> dedupFieldValues = new HashMap<String, String>(); - dedupFieldValues.put("alertKey", (String) event.getData()[event.getSchema().getColumnIndex("alertKey")]); - List<AlertStreamEvent> result = dedupCache.dedup(event, - new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), dedupFieldValues), - "state", - (String) event.getData()[event.getSchema().getColumnIndex("state")], "closed"); - System.out.println((i + 1) + " >>>> " + ToStringBuilder.reflectionToString(result)); - } - - Assert.assertTrue(true); - } - - private AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(policy.getName()); - event.setSchema(stream); - event.setStreamId(stream.getStreamId()); - event.setTimestamp(System.currentTimeMillis()); - event.setCreatedTime(System.currentTimeMillis()); - event.setData(data); - return event; - } - - private StreamDefinition createStream() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn alertKeyColumn = new StreamColumn(); - alertKeyColumn.setName("alertKey"); - alertKeyColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - // dedupCount, dedupFirstOccurrence - - StreamColumn dedupCountColumn = new StreamColumn(); - dedupCountColumn.setName("dedupCount"); - dedupCountColumn.setType(StreamColumn.Type.LONG); - - StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); - dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); - dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, dedupFirstOccurrenceColumn)); - sd.setDataSource("testDatasource"); - sd.setStreamId("testStream"); - sd.setDescription("test stream"); - return sd; - } - - private PolicyDefinition createPolicy(String streamName, String policyName) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - //expression, something like "PT5S,dynamic,1,host" - def.setValue("test"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList("inputStream")); - pd.setOutputStreams(Arrays.asList("outputStream")); - pd.setName(policyName); - pd.setDescription(String.format("Test policy for stream %s", streamName)); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamName); - sp.setColumns(Arrays.asList("host")); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - return pd; - } - -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java deleted file mode 100644 index c48df9a..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import com.google.common.base.Joiner; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentLinkedDeque; - -import static org.apache.eagle.alert.engine.publisher.AlertPublisherTestHelper.*; - -public class DefaultDedupWithoutStateTest { - - @Test - public void testNormal() throws Exception { - //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue - // assume state: OPEN, WARN, CLOSE - System.setProperty("config.resource", "/application-mongo-statestore.conf"); - Config config = ConfigFactory.load(); - DedupCache dedupCache = new DedupCache(config, "testPublishment"); - DefaultDeduplicator deduplicator = new DefaultDeduplicator( - "PT10S", Arrays.asList(new String[] {"alertKey"}), null, null, dedupCache); - - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), "testPolicy"); - - int[] hostIndex = new int[] {1, 2, 3}; - String[] states = new String[] {"OPEN", "WARN", "CLOSE"}; - Random random = new Random(); - - final ConcurrentLinkedDeque<AlertStreamEvent> nonDedupResult = new ConcurrentLinkedDeque<AlertStreamEvent>(); - - for (int i = 0; i < 100; i++) { - new Thread(new Runnable() { - - @Override - public void run() { - int index = hostIndex[random.nextInt(3)]; - AlertStreamEvent e1 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host" + index, - String.format("testPolicy-host%s-01", index), - states[random.nextInt(3)], 0, 0 - }); - List<AlertStreamEvent> result = deduplicator.dedup(e1); - if (result != null) { - System.out.println(">>>" + Joiner.on(",").join(result)); - nonDedupResult.addAll(result); - } else { - System.out.println(">>>" + result); - } - } - - }).start(); - } - - Thread.sleep(1000); - - System.out.println("old size: " + nonDedupResult.size()); - Assert.assertTrue(nonDedupResult.size() > 0 && nonDedupResult.size() <= 3); - - Thread.sleep(15000); - - for (int i = 0; i < 100; i++) { - new Thread(new Runnable() { - - @Override - public void run() { - int index = hostIndex[random.nextInt(3)]; - AlertStreamEvent e1 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host" + index, - String.format("testPolicy-host%s-01", index), - states[random.nextInt(3)], 0, 0 - }); - List<AlertStreamEvent> result = deduplicator.dedup(e1); - if (result != null) { - System.out.println(">>>" + Joiner.on(",").join(result)); - nonDedupResult.addAll(result); - } else { - System.out.println(">>>" + result); - } - } - - }).start(); - } - - Thread.sleep(1000); - - System.out.println("new size: " + nonDedupResult.size()); - Assert.assertTrue(nonDedupResult.size() > 3 && nonDedupResult.size() <= 6); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java deleted file mode 100644 index 297b790..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.commons.lang.builder.ToStringBuilder; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.eagle.alert.engine.publisher.AlertPublisherTestHelper.*; - -public class DefaultDeduplicatorTest { - - @Test - public void testNormal() throws Exception { - //String intervalMin, List<String> customDedupFields, String dedupStateField, String dedupStateCloseValue - // assume state: OPEN, WARN, CLOSE - System.setProperty("config.resource", "/application-mongo-statestore.conf"); - Config config = ConfigFactory.load(); - DedupCache dedupCache = new DedupCache(config, "testPublishment"); - DefaultDeduplicator deduplicator = new DefaultDeduplicator( - "PT1M", Arrays.asList(new String[] {"alertKey"}), "state", "close", dedupCache); - - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), "testPolicy"); - - AlertStreamEvent e1 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e2 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0 - }); - AlertStreamEvent e3 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e4 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "WARN", 0, 0 - }); - AlertStreamEvent e5 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "CLOSE", 0, 0 - }); - AlertStreamEvent e6 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e7 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - AlertStreamEvent e8 = createEvent(stream, policy, new Object[] { - System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0 - }); - - List<AlertStreamEvent> allResults = new ArrayList<AlertStreamEvent>(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e1); - if (result != null) { - allResults.addAll(result); - } - System.out.println("1 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e2); - if (result != null) { - allResults.addAll(result); - } - System.out.println("2 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e3); - if (result != null) { - allResults.addAll(result); - } - System.out.println("3 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e4); - if (result != null) { - allResults.addAll(result); - } - System.out.println("4 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - } - - List<AlertStreamEvent> result = deduplicator.dedup(e5); - if (result != null) { - allResults.addAll(result); - } - System.out.println("5 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e6); - if (result != null) { - allResults.addAll(result); - } - System.out.println("6 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e7); - if (result != null) { - allResults.addAll(result); - } - System.out.println("7 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - new Thread(new Runnable() { - @Override - public void run() { - List<AlertStreamEvent> result = deduplicator.dedup(e8); - if (result != null) { - allResults.addAll(result); - } - System.out.println("8 >>>> " + ToStringBuilder.reflectionToString(result)); - } - }).start(); - - Thread.sleep(2000); - - long maxCount = 0; - for (AlertStreamEvent event : allResults) { - Assert.assertNotNull(event.getData()[4]); - Assert.assertNotNull(event.getData()[5]); - - if (((Long) event.getData()[4]) > maxCount) { - maxCount = (Long) event.getData()[4]; - System.out.println(String.format(">>>>>%s: %s", event, maxCount)); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java deleted file mode 100644 index a788646..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/ExtendedDeduplicatorTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import java.util.Arrays; -import java.util.List; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; -import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory; -import org.apache.eagle.alert.engine.router.TestAlertPublisherBolt; -import org.junit.Assert; -import org.junit.Test; - -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; - -public class ExtendedDeduplicatorTest { - - @Test - public void testNormal() throws Exception { - List<Publishment> pubs = loadEntities("/router/publishments-extended-deduplicator.json", Publishment.class); - - AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null); - AlertStreamEvent event1 = createWithStreamDef("extended_dedup_host1", "extended_dedup_testapp1", "OPEN"); - AlertStreamEvent event2 = createWithStreamDef("extended_dedup_host2", "extended_dedup_testapp1", "OPEN"); - AlertStreamEvent event3 = createWithStreamDef("extended_dedup_host2", "extended_dedup_testapp2", "CLOSE"); - - Assert.assertNotNull(plugin.dedup(event1)); - Assert.assertNull(plugin.dedup(event2)); - Assert.assertNotNull(plugin.dedup(event3)); - - } - - private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception { - ObjectMapper objectMapper = new ObjectMapper(); - JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz)); - List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type); - return l; - } - - private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) { - AlertStreamEvent alert = new AlertStreamEvent(); - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("perfmon_cpu_host_check"); - alert.setPolicyId(policy.getName()); - alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[] {appName, hostname, state}); - alert.setStreamId("testAlertStream"); - alert.setCreatedBy(this.toString()); - - // build stream definition - StreamDefinition sd = new StreamDefinition(); - StreamColumn appColumn = new StreamColumn(); - appColumn.setName("appname"); - appColumn.setType(StreamColumn.Type.STRING); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("hostname"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn)); - - alert.setSchema(sd); - return alert; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java deleted file mode 100644 index 31f744e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/SimpleEmbedMongo.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.mongodb.MongoClient; - -import de.flapdoodle.embed.mongo.MongodExecutable; -import de.flapdoodle.embed.mongo.MongodProcess; -import de.flapdoodle.embed.mongo.MongodStarter; -import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; -import de.flapdoodle.embed.mongo.config.Net; -import de.flapdoodle.embed.mongo.distribution.Version; -import de.flapdoodle.embed.process.runtime.Network; - -public class SimpleEmbedMongo { - - private static final Logger LOG = LoggerFactory.getLogger(SimpleEmbedMongo.class); - - private MongoClient client; - private MongodExecutable mongodExe; - private MongodProcess mongod; - - public void start() throws Exception { - MongodStarter starter = MongodStarter.getDefaultInstance(); - mongodExe = starter.prepare(new MongodConfigBuilder().version(Version.V3_2_1) - .net(new Net(27017, Network.localhostIsIPv6())).build()); - mongod = mongodExe.start(); - - client = new MongoClient("localhost"); - } - - public void shutdown() { - - if (mongod != null) { - try { - mongod.stop(); - } - catch (IllegalStateException e) { - // catch this exception for the unstable stopping mongodb - // reason: the exception is usually thrown out with below message format when stop() returns null value, - // but actually this should have been captured in ProcessControl.stopOrDestroyProcess() by destroying - // the process ultimately - if (e.getMessage() != null && e.getMessage().matches("^Couldn't kill.*process!.*")) { - // if matches, do nothing, just ignore the exception - } else { - LOG.warn(String.format("Ignored error for stopping mongod process, see stack trace: %s", ExceptionUtils.getStackTrace(e))); - } - } - mongodExe.stop(); - } - } - - public MongoClient getMongoClient() { - return client; - } - - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java deleted file mode 100644 index 247f332..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.dedup; - -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; -import org.junit.Ignore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.typesafe.config.Config; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@Ignore -public class TestDeduplicator extends ExtendedDeduplicator { - - public TestDeduplicator(Config config, Map<String, String> properties, List<String> customDedupFields, - String dedupStateField, DedupCache dedupCache, String publishName) { - super(config, properties, customDedupFields, dedupStateField, dedupCache, publishName); - } - - private static final Logger LOG = LoggerFactory.getLogger(TestDeduplicator.class); - - @Override - public List<AlertStreamEvent> dedup(AlertStreamEvent event) { - StreamDefinition streamDefinition = event.getSchema(); - HashMap<String, String> customFieldValues = new HashMap<>(); - String stateFiledValue = null; - for (int i = 0; i < event.getData().length; i++) { - if (i > streamDefinition.getColumns().size()) { - continue; - } - String colName = streamDefinition.getColumns().get(i).getName(); - - if (colName.equals(this.getDedupStateField())) { - stateFiledValue = event.getData()[i].toString(); - } - - // make all of the field as unique key if no custom dedup field provided - if (this.getCustomDedupFields() == null || this.getCustomDedupFields().size() <= 0) { - customFieldValues.put(colName, event.getData()[i].toString()); - } else { - for (String field : this.getCustomDedupFields()) { - if (colName.equals(field)) { - customFieldValues.put(field, event.getData()[i].toString()); - break; - } - } - } - } - LOG.info("event: " + event); - EventUniq eventkey = new EventUniq(event.getStreamId(), event.getPolicyId(), event.getCreatedTime(), customFieldValues); - LOG.info("event key: " + eventkey); - LOG.info("dedup field: " + this.getDedupStateField()); - LOG.info("dedup value: " + stateFiledValue); - List<AlertStreamEvent> result = this.getDedupCache().dedup(event, eventkey, this.getDedupStateField(), stateFiledValue, "closed"); - return result; - } - - @Override - public void setDedupIntervalMin(String intervalMin) { - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java deleted file mode 100644 index 7b1d494..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.template; - -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.coordinator.AlertDefinition; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -public class VelocityAlertTemplateEngineTest { - @Test - public void testVelocityAlertTemplate () { - AlertTemplateEngine templateEngine = new VelocityAlertTemplateEngine(); - templateEngine.init(ConfigFactory.load()); - templateEngine.register(mockPolicy("testPolicy")); - AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicy")); - Assert.assertEquals("Alert (2016-11-30 07:31:15): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " + - "exceeding thread hold: 90%. (policy: testPolicy, description: Policy for monitoring cpu usage > 90%), " + - "definition: from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " + - "select site, metric, host, role, value insert into capacityUsageAlert", event.getBody()); - Assert.assertEquals("Name Node Usage Exceed 90%, reach 98.0% now", event.getSubject()); - } - - @Test - public void testVelocityAlertTemplateWithoutTemplate () { - AlertTemplateEngine templateEngine = new VelocityAlertTemplateEngine(); - templateEngine.init(ConfigFactory.load()); - templateEngine.register(mockPolicyWithoutTemplate("testPolicyName")); - AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicyName")); - Assert.assertEquals("Message: Alert {site=test, stream=ALERT_STREAM,timestamp=2016-11-30 07:31:15,923," + - "data={site=test_cluster, role=hadoop, metric=cpu.usage, host=localhost, value=0.98}, " + - "policyId=testPolicyName, createdBy=junit, metaVersion=SAMPLE_META_VERSION} " + - "(Auto-generated alert message as template not defined in policy testPolicyName)", event.getBody()); - Assert.assertEquals("testPolicyName", event.getSubject()); - } - - private static PolicyDefinition mockPolicy (String policyId) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " + - "select site, metric, host, role, value insert into capacityUsageAlert"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM")); - pd.setOutputStreams(Collections.singletonList("capacityUsageAlert")); - pd.setName(policyId); - pd.setDescription("Policy for monitoring cpu usage > 90%"); - AlertDefinition alertDefinition = new AlertDefinition(); - alertDefinition.setSubject("Name Node Usage Exceed 90%, reach #set($usage_per = $value * 100)$usage_per% now"); - alertDefinition.setBody("Alert ($CREATED_TIME): cpu usage on $role of cluster $site at $host is $value, exceeding thread hold: 90%. " - + "(policy: $POLICY_ID, description: $POLICY_DESC), definition: $POLICY_DEFINITION"); - pd.setAlertDefinition(alertDefinition); - return pd; - } - private static PolicyDefinition mockPolicyWithoutTemplate (String policyId) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " + - "select site, metric, host, role, value insert into capacityUsageAlert"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM")); - pd.setOutputStreams(Collections.singletonList("capacityUsageAlert")); - pd.setName(policyId); - pd.setDescription("Policy for monitoring cpu usage > 90%"); - return pd; - } - - private AlertStreamEvent mockAlertEvent (String policyId) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setSiteId("test"); - event.setCreatedBy("junit"); - event.setCreatedTime(1480491075923L); - event.setPolicyId(policyId); - event.setStreamId("ALERT_STREAM"); - event.setSchema(mockAlertStreamDefinition("ALERT_STREAM")); - event.setMetaVersion("SAMPLE_META_VERSION"); - event.setTimestamp(1480491075923L); - event.setData(new Object[]{"test_cluster", "cpu.usage", "localhost", "hadoop", 0.98}); - event.ensureAlertId(); - return event; - } - - private StreamDefinition mockAlertStreamDefinition(String streamId){ - StreamDefinition streamDefinition = new StreamDefinition(); - streamDefinition.setStreamId(streamId); - streamDefinition.setSiteId("test_cluster"); - List<StreamColumn> columns = new ArrayList<>(); - StreamColumn column = new StreamColumn(); - column.setName("site"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - column = new StreamColumn(); - column.setName("metric"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - column = new StreamColumn(); - column.setName("host"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - column = new StreamColumn(); - column.setName("role"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - column = new StreamColumn(); - column.setName("value"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - - streamDefinition.setColumns(columns); - return streamDefinition; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java deleted file mode 100644 index 269cb71..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.publisher.template; - -import org.apache.velocity.exception.MethodInvocationException; -import org.apache.velocity.exception.ParseErrorException; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class VelocityTemplateParserTest { - @Test - public void testParseVelocityTemplate() { - String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time"; - VelocityTemplateParser parser = new VelocityTemplateParser(templateString); - Assert.assertEquals(5, parser.getReferenceNames().size()); - Assert.assertArrayEquals(new String[]{"category", "reason", "REASON", "source", "created_time"}, parser.getReferenceNames().toArray()); - } - - - @Test(expected = ParseErrorException.class) - public void testParseInvalidVelocityTemplate() { - String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time #if() #fi"; - VelocityTemplateParser parser = new VelocityTemplateParser(templateString); - Assert.assertEquals(5, parser.getReferenceNames().size()); - Assert.assertArrayEquals(new String[]{"category", "reason", "REASON", "source", "created_time"}, parser.getReferenceNames().toArray()); - } - - @Test - public void testValidateVelocityContext() { - String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time"; - VelocityTemplateParser parser = new VelocityTemplateParser(templateString); - Map<String,Object> context = new HashMap<>(); - context.put("category", "UNKNOWN"); - context.put("reason", "timeout"); - context.put("REASON", "IO error"); - context.put("source","localhost"); - context.put("created_time", "2016-11-30 05:52:47,053"); - parser.validateContext(context); - } - - @Test(expected = MethodInvocationException.class) - public void testValidateInvalidVelocityContext() { - String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time"; - VelocityTemplateParser parser = new VelocityTemplateParser(templateString); - parser.validateContext(new HashMap<>()); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java deleted file mode 100644 index b67f394..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher.template; - -import org.apache.velocity.Template; -import org.apache.velocity.VelocityContext; -import org.apache.velocity.app.Velocity; -import org.apache.velocity.app.VelocityEngine; -import org.apache.velocity.runtime.RuntimeConstants; -import org.apache.velocity.runtime.parser.node.ASTReference; -import org.apache.velocity.runtime.parser.node.ASTprocess; -import org.apache.velocity.runtime.resource.loader.StringResourceLoader; -import org.apache.velocity.runtime.resource.util.StringResourceRepository; -import org.apache.velocity.runtime.visitor.NodeViewMode; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -public class VelocityTemplateTest { - private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateTest.class); - - @Test - public void testVelocityTemplate() { - String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time"; - String resultString = "This alert ($category) was generated because timeout and IO error from localhost at 2016-11-30 05:52:47,053"; - VelocityEngine engine = new VelocityEngine(); - engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute"); - engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName()); - engine.setProperty(Velocity.RESOURCE_LOADER, "string"); - engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName()); - engine.addProperty("string.resource.loader.repository.static", "false"); - // engine.addProperty("runtime.references.strict", "true"); - engine.init(); - - StringResourceRepository repo = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT); - repo.putStringResource("alert_template", ""); - repo.putStringResource("alert_template", templateString); - - Assert.assertEquals(templateString, repo.getStringResource("alert_template").getBody()); - - VelocityContext context = new VelocityContext(); - context.put("reason", "timeout"); - context.put("REASON", "IO error"); - context.put("source","localhost"); - context.put("created_time", "2016-11-30 05:52:47,053"); - - Template velocityTemplate = engine.getTemplate("alert_template"); - ASTprocess data = (ASTprocess) velocityTemplate.getData(); - ReferenceContext referenceContext = new ReferenceContext(); - data.jjtAccept(referenceContext,null); - Assert.assertEquals(5, referenceContext.getReferences().size()); - StringWriter writer = new StringWriter(); - velocityTemplate.merge(context, writer); - velocityTemplate.process(); - Assert.assertEquals(resultString, writer.toString()); - } - - private class ReferenceContext extends NodeViewMode { - private List<ASTReference> references = new ArrayList<>(); - - @Override - public Object visit(ASTReference node, Object data) { - references.add(node); - return super.visit(node, data); - } - - public List<ASTReference> getReferences() { - return this.references; - } - - public List<String> getReferenceNames() { - return this.references.stream().map(ASTReference::getRootString).collect(Collectors.toList()); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java deleted file mode 100644 index 284abc4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; - -import java.util.Map; - -/** - * Created on 8/29/16. - */ -public class CustomizedHandler implements PolicyStreamHandler { - private Collector<AlertStreamEvent> collector; - private PolicyHandlerContext context; - private Map<String, StreamDefinition> sds; - - public CustomizedHandler(Map<String, StreamDefinition> sds) { - this.sds = sds; - } - - @Override - public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { - this.collector = collector; - this.context = context; - } - - @Override - public void send(StreamEvent event) throws Exception { - AlertStreamEvent alert = new AlertStreamEvent(); - alert.setPolicyId(context.getPolicyDefinition().getName()); - alert.setSchema(sds.get(event.getStreamId())); - this.collector.emit(alert); - } - - @Override - public void close() throws Exception { - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java deleted file mode 100755 index c9e09fd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java +++ /dev/null @@ -1,701 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.router; - -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.coordination.model.AlertBoltSpec; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers; -import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.runner.AlertBolt; -import org.apache.eagle.alert.engine.runner.TestStreamRouterBolt; -import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl; -import org.apache.eagle.common.DateTimeUtil; - -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Since 5/2/16. - */ -@SuppressWarnings( {"rawtypes", "unused"}) -public class TestAlertBolt { - - public static final String TEST_STREAM = "test-stream"; - - private static final Logger LOG = LoggerFactory.getLogger(TestAlertBolt.class); - - /** - * Following knowledge is guaranteed in - * - * @throws Exception Add test case: 2 alerts should be generated even if they are very close to each other in timestamp - * @see org.apache.eagle.alert.engine.runner.AlertBolt#execute{ - * if(!routedStreamEvent.getRoute().getTargetComponentId().equals(this.policyGroupEvaluator.getName())){ - * throw new IllegalStateException("Got event targeted to "+ routedStreamEvent.getRoute().getTargetComponentId()+" in "+this.policyGroupEvaluator.getName()); - * } - * } - */ - @Test - public void testAlertBolt() throws Exception { - final AtomicInteger alertCount = new AtomicInteger(); - final Semaphore mutex = new Semaphore(0); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - int count = 0; - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - alertCount.incrementAndGet(); - mutex.release(); - Assert.assertEquals("testAlertStream", ((PublishPartition) tuple.get(0)).getStreamId()); - Assert.assertEquals("testAlertPublish", ((PublishPartition) tuple.get(0)).getPublishId()); - AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple)); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - @Override - public void reportError(Throwable error) { - } - - }); - AlertBolt bolt = createAlertBolt(collector); - - String streamId = "cpuUsageStream"; - - // construct StreamDefinition - StreamDefinition schema = new StreamDefinition(); - schema.setStreamId(streamId); - StreamColumn column = new StreamColumn(); - column.setName("col1"); - column.setType(StreamColumn.Type.STRING); - schema.setColumns(Collections.singletonList(column)); - Map<String, StreamDefinition> sds = new HashMap<>(); - sds.put(schema.getStreamId(), schema); - - // construct StreamPartition - StreamPartition sp = new StreamPartition(); - sp.setColumns(Collections.singletonList("col1")); - sp.setStreamId(streamId); - sp.setType(StreamPartition.Type.GROUPBY); - - AlertBoltSpec spec = new AlertBoltSpec(); - spec.setVersion("version1"); - spec.setTopologyName("testTopology"); - PolicyDefinition pd = new PolicyDefinition(); - pd.setName("policy1"); - pd.setPartitionSpec(Collections.singletonList(sp)); - pd.setOutputStreams(Collections.singletonList("testAlertStream")); - pd.setInputStreams(Collections.singletonList(streamId)); - pd.setDefinition(new PolicyDefinition.Definition()); - pd.getDefinition().type = PolicyStreamHandlers.SIDDHI_ENGINE; - pd.getDefinition().value = "from cpuUsageStream[col1=='value1' OR col1=='value2'] select col1 insert into testAlertStream;"; - spec.addBoltPolicy("alertBolt1", pd.getName()); - spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<PolicyDefinition>(Arrays.asList(pd))); - spec.addPublishPartition("testAlertStream", "policy1", "testAlertPublish", null); - bolt.onAlertBoltSpecChange(spec, sds); - - // contruct GeneralTopologyContext - GeneralTopologyContext context = mock(GeneralTopologyContext.class); - int taskId = 1; - when(context.getComponentId(taskId)).thenReturn("comp1"); - when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0")); - - // construct event with "value1" - StreamEvent event1 = new StreamEvent(); - event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000); - event1.setMetaVersion("version1"); - Object[] data = new Object[] {"value1"}; - event1.setData(data); - event1.setStreamId(streamId); - PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001); - - // construct another event with "value1" - StreamEvent event2 = new StreamEvent(); - event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000); - event2.setMetaVersion("version1"); - data = new Object[] {"value2"}; - event2.setData(data); - event2.setStreamId(streamId); - PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001); - - Thread.sleep(3000); - Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default"); - Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default"); - bolt.execute(input); - bolt.execute(input2); - Assert.assertTrue("Timeout to acquire mutex in 5s", mutex.tryAcquire(2, 5, TimeUnit.SECONDS)); - Assert.assertEquals(2, alertCount.get()); - bolt.cleanup(); - } - - public static AlertBolt createAlertBolt(OutputCollector collector) { - Config config = ConfigFactory.load(); - PolicyGroupEvaluator policyGroupEvaluator = new PolicyGroupEvaluatorImpl("testPolicyGroupEvaluatorImpl"); - TestStreamRouterBolt.MockChangeService mockChangeService = new TestStreamRouterBolt.MockChangeService(); - AlertBolt bolt = new AlertBolt("alertBolt1", config, mockChangeService); - Map stormConf = new HashMap<>(); - TopologyContext topologyContext = mock(TopologyContext.class); - when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric()); - bolt.prepare(stormConf, topologyContext, collector); - return bolt; - } - - @Test - public void testMetadataMismatch() throws Exception { - AtomicInteger failedCount = new AtomicInteger(); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - int count = 0; - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - Assert.assertEquals("testAlertStream", tuple.get(0)); - AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - failedCount.incrementAndGet(); - } - - @Override - public void reportError(Throwable error) { - } - }); - AlertBolt bolt = createAlertBolt(collector); - - GeneralTopologyContext context = mock(GeneralTopologyContext.class); - int taskId = 1; - when(context.getComponentId(taskId)).thenReturn("comp1"); - when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0")); - // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange) - PartitionedEvent pe = new PartitionedEvent(); - pe.setPartitionKey(1); - pe.setPartition(createPartition()); - StreamEvent streamEvent = new StreamEvent(); - streamEvent.setStreamId("test-stream"); - streamEvent.setTimestamp(System.currentTimeMillis()); - pe.setEvent(streamEvent); - - PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt); - byte[] serializedEvent = peSer.serialize(pe); - Tuple input = new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default"); - bolt.execute(input); - - Assert.assertEquals(1, failedCount.get()); - failedCount.set(0); - - { - // case 2: metadata loaded but empty (AlertBoltSepc) - bolt.onAlertBoltSpecChange(new AlertBoltSpec(), new HashMap()); - - bolt.execute(input); - Assert.assertEquals(1, failedCount.get()); - failedCount.set(0); - } - - // case 3: metadata loaded but mismatched - { - Map<String, StreamDefinition> sds = new HashMap(); - StreamDefinition sdTest = new StreamDefinition(); - String streamId = "pd-test"; // here streamId is different from the one "test-stream" (StreamEvent) - sdTest.setStreamId(streamId); - sds.put(sdTest.getStreamId(), sdTest); - - AlertBoltSpec boltSpecs = new AlertBoltSpec(); - boltSpecs.setVersion("specVersion-" + System.currentTimeMillis()); - - PolicyDefinition def = new PolicyDefinition(); - def.setName("policy-definition"); - def.setInputStreams(Arrays.asList(streamId)); - def.setOutputStreams(Arrays.asList("output")); - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE); - definition.setValue("PT0M,provided,1,host,host1"); - def.setDefinition(definition); - - boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); - - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - bolt.execute(input); - Assert.assertEquals(1, failedCount.get()); - failedCount.set(0); - } - } - - //TODO: no data alert failed, need to check when no data alert merged. - @Test - public void testMetaversionConflict() throws Exception { - AtomicInteger failedCount = new AtomicInteger(); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - int count = 0; - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - Assert.assertEquals("testAlertStream", tuple.get(0)); - AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - failedCount.incrementAndGet(); - } - - @Override - public void reportError(Throwable error) { - } - }); - AlertBolt bolt = createAlertBolt(collector); - - Map<String, StreamDefinition> sds = new HashMap(); - StreamDefinition sdTest = new StreamDefinition(); - String streamId = "test-stream"; - sdTest.setStreamId(streamId); - sds.put(sdTest.getStreamId(), sdTest); - - AlertBoltSpec boltSpecs = new AlertBoltSpec(); - boltSpecs.setVersion("spec_version_" + System.currentTimeMillis()); - boltSpecs.setTopologyName("alertUnitTopology_1"); - - PolicyDefinition def = new PolicyDefinition(); - def.setName("policy-definition"); - def.setInputStreams(Arrays.asList(streamId)); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType(PolicyStreamHandlers.NO_DATA_ALERT_ENGINE); - definition.setValue("PT0M,provided,1,host,host1"); - def.setDefinition(definition); - def.setPartitionSpec(Arrays.asList(createPartition())); - def.setOutputStreams(Arrays.asList("out")); - - boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); - bolt = createAlertBolt(collector); - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - Tuple input = createTuple(bolt, boltSpecs.getVersion()); - bolt.execute(input); - - // Sleep 10s to wait thread in bolt.execute() to finish works - Thread.sleep(10000); - - Assert.assertEquals(0, failedCount.get()); - failedCount.set(0); - - } - - private Tuple createTuple(AlertBolt bolt, String version) throws IOException { - GeneralTopologyContext context = mock(GeneralTopologyContext.class); - int taskId = 1; - when(context.getComponentId(taskId)).thenReturn("comp1"); - when(context.getComponentOutputFields("comp1", TEST_STREAM)).thenReturn(new Fields("f0")); - // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange) - PartitionedEvent pe = new PartitionedEvent(); - pe.setPartitionKey(1); - pe.setPartition(createPartition()); - StreamEvent streamEvent = new StreamEvent(); - streamEvent.setStreamId(TEST_STREAM); - streamEvent.setTimestamp(System.currentTimeMillis()); - streamEvent.setMetaVersion(version); - pe.setEvent(streamEvent); - - PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt); - byte[] serializedEvent = peSer.serialize(pe); - return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, TEST_STREAM); - } - - private StreamPartition createPartition() { - StreamPartition sp = new StreamPartition(); - sp.setStreamId(TEST_STREAM); - sp.setType(StreamPartition.Type.GROUPBY); - return sp; - } - - @Test - public void testExtendDefinition() throws IOException { - PolicyDefinition def = new PolicyDefinition(); - def.setName("policy-definition"); - def.setInputStreams(Arrays.asList(TEST_STREAM)); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE); - definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler"); - definition.setValue("PT0M,plain,1,host,host1"); - def.setDefinition(definition); - def.setPartitionSpec(Arrays.asList(createPartition())); - - AlertBoltSpec boltSpecs = new AlertBoltSpec(); - - AtomicBoolean recieved = new AtomicBoolean(false); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - recieved.set(true); - return Collections.emptyList(); - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - @Override - public void reportError(Throwable error) { - } - }); - AlertBolt bolt = createAlertBolt(collector); - - boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); - boltSpecs.setVersion("spec_" + System.currentTimeMillis()); - // stream def map - Map<String, StreamDefinition> sds = new HashMap(); - StreamDefinition sdTest = new StreamDefinition(); - sdTest.setStreamId(TEST_STREAM); - sds.put(sdTest.getStreamId(), sdTest); - - boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null); - - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - // how to assert - Tuple t = createTuple(bolt, boltSpecs.getVersion()); - - bolt.execute(t); - - Assert.assertTrue(recieved.get()); - } - - @Test - public void testStreamDefinitionChange() throws IOException { - PolicyDefinition def = new PolicyDefinition(); - def.setName("policy-definition"); - def.setInputStreams(Arrays.asList(TEST_STREAM)); - - PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); - definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE); - definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler"); - definition.setValue("PT0M,plain,1,host,host1"); - def.setDefinition(definition); - def.setPartitionSpec(Arrays.asList(createPartition())); - - AlertBoltSpec boltSpecs = new AlertBoltSpec(); - - AtomicBoolean recieved = new AtomicBoolean(false); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - recieved.set(true); - return Collections.emptyList(); - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - @Override - public void reportError(Throwable error) { - } - }); - AlertBolt bolt = createAlertBolt(collector); - - boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); - boltSpecs.setVersion("spec_" + System.currentTimeMillis()); - // stream def map - Map<String, StreamDefinition> sds = new HashMap(); - StreamDefinition sdTest = new StreamDefinition(); - sdTest.setStreamId(TEST_STREAM); - sds.put(sdTest.getStreamId(), sdTest); - - boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null); - - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - LOG.info("Update stream"); - sds = new HashMap(); - sdTest = new StreamDefinition(); - sdTest.setStreamId(TEST_STREAM); - sds.put(sdTest.getStreamId(), sdTest); - sdTest.setDescription("update the stream"); - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - LOG.info("Update stream & update policy"); - sds = new HashMap(); - sdTest = new StreamDefinition(); - sdTest.setStreamId(TEST_STREAM); - sds.put(sdTest.getStreamId(), sdTest); - sdTest.setDescription("update the stream & update policy"); - - def = new PolicyDefinition(); - def.setName("policy-definition"); - def.setInputStreams(Arrays.asList(TEST_STREAM)); - - definition = new PolicyDefinition.Definition(); - definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE); - definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler"); - definition.setValue("PT0M,plain,1,host,host2"); - def.setDefinition(definition); - def.setPartitionSpec(Arrays.asList(createPartition())); - boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); - - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - LOG.info("No any change"); - sds = new HashMap(); - sdTest = new StreamDefinition(); - sdTest.setStreamId(TEST_STREAM); - sds.put(sdTest.getStreamId(), sdTest); - sdTest.setDescription("update the stream"); - bolt.onAlertBoltSpecChange(boltSpecs, sds); - - // how to assert - Tuple t = createTuple(bolt, boltSpecs.getVersion()); - - bolt.execute(t); - - Assert.assertTrue(recieved.get()); - } - - @Test @Ignore - public void testMultiStreamDefinition() throws Exception { - final AtomicInteger alertCount = new AtomicInteger(); - final Semaphore mutex = new Semaphore(0); - OutputCollector collector = new OutputCollector(new IOutputCollector() { - int count = 0; - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - System.out.println("=====output collector=========="); - alertCount.incrementAndGet(); - mutex.release(); - Assert.assertTrue("symptomaticAlertOutputStream".equals((String) tuple.get(0)) - || "deviceDownAlertStream".equals((String) tuple.get(0))); - AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); - System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); - - System.out.println("**********output collector end***********"); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - @Override - public void reportError(Throwable error) { - } - }); - - - AlertBolt bolt = createAlertBolt(collector); - - // construct StreamPartition - StreamPartition sp = new StreamPartition(); - sp.setColumns(Collections.singletonList("col1")); - sp.setStreamId("correlatedStream"); - sp.setType(StreamPartition.Type.GROUPBY); - - pushAlertBoltSpec(sp, bolt); - - // now emit - // contruct GeneralTopologyContext - GeneralTopologyContext context = mock(GeneralTopologyContext.class); - int taskId = 1; - when(context.getComponentId(taskId)).thenReturn("comp1"); - when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0")); - - long base = System.currentTimeMillis(); - int i = 0; - String linkedSwitch = "lvs-ra-01"; - - // construct event with "value1" - StreamEvent event1 = new StreamEvent(); - event1.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:00") * 1000); - event1.setMetaVersion("version1"); - Object[] data = new Object[] { base , "child-"+ (i++), "", linkedSwitch}; - event1.setData(data); - event1.setStreamId("correlatedStream"); - PartitionedEvent partitionedEvent1 = new PartitionedEvent(event1, sp, 1001); - - // construct another event with "value1" - StreamEvent event2 = new StreamEvent(); - event2.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:05:00") * 1000); - event2.setMetaVersion("version1"); - data = new Object[] { base , "child-"+ (i++), "", linkedSwitch}; - event2.setData(data); - event2.setStreamId("correlatedStream"); - PartitionedEvent partitionedEvent2 = new PartitionedEvent(event2, sp, 1001); - - Tuple input = new TupleImpl(context, Collections.singletonList(partitionedEvent1), taskId, "default"); - Tuple input2 = new TupleImpl(context, Collections.singletonList(partitionedEvent2), taskId, "default"); - bolt.execute(input); - bolt.execute(input2); - Assert.assertTrue("Timeout to acquire mutex in 10s", mutex.tryAcquire(1, 10, TimeUnit.SECONDS)); - Assert.assertEquals(3, alertCount.get()); - bolt.cleanup(); - } - - private void pushAlertBoltSpec(StreamPartition sp, AlertBolt bolt) { - Map<String, StreamDefinition> sds = new HashMap<>(); - sds.put("correlatedStream", createCorrelateStream("correlatedStream")); - sds.put("symptomaticAlertOutputStream", createCorrelateStream("symptomaticAlertOutputStream")); // output of updated correlatedStream - sds.put("deviceDownAlertStream", createCorrelateStream("deviceDownAlertStream")); - - PolicyDefinition pd = new PolicyDefinition(); - pd.setName("network_symptomatic"); - pd.setInputStreams(Arrays.asList("correlatedStream")); - pd.setOutputStreams(Arrays.asList("deviceDownAlertStream", "symptomaticAlertOutputStream")); - - pd.setPartitionSpec(Arrays.asList(sp)); - - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setType(PolicyStreamHandlers.SIDDHI_ENGINE); - def.setValue("from correlatedStream#window.externalTime(timestamp, 3 min) select UUID() as docId, linkedSwitch, '' as parentKey, timestamp group by linkedSwitch having count() > 0 insert into deviceDownAlertStream; " + - " from correlatedStream#window.externalTime(timestamp, 3 min) as left join deviceDownAlertStream#window.time(3 min) as right on left.linkedSwitch == right.linkedSwitch" + - " select left.docId, left.timestamp, left.linkedSwitch, right.docId as parentKey insert into symptomaticAlertOutputStream;"); - pd.setDefinition(def); - - - AlertBoltSpec spec = new AlertBoltSpec(); - spec.setVersion("version1"); - spec.setTopologyName("testTopology"); - spec.addBoltPolicy("alertBolt1", pd.getName()); - spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(Arrays.asList(pd))); - - bolt.onAlertBoltSpecChange(spec, sds); - } - - private StreamDefinition createCorrelateStream(String streamId) { - // construct StreamDefinition - StreamDefinition schema = new StreamDefinition(); - schema.setStreamId(streamId); - List<StreamColumn> columns = new LinkedList<>(); - { - StreamColumn column = new StreamColumn(); - column.setName("timestamp"); - column.setType(StreamColumn.Type.LONG); - columns.add(column); - } - { - StreamColumn column = new StreamColumn(); - column.setName("docId"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - } - { - StreamColumn column = new StreamColumn(); - column.setName("parentKey"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - } - { - StreamColumn column = new StreamColumn(); - column.setName("linkedSwitch"); - column.setType(StreamColumn.Type.STRING); - columns.add(column); - } - - schema.setColumns(columns); - return schema; - } - -} -
