http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java deleted file mode 100644 index 21872b9..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockSampleMetadataFactory.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.mock; - -import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; - -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; - -@SuppressWarnings("serial") -public class MockSampleMetadataFactory { - private static MockStreamMetadataService mockStreamMetadataServiceInstance = null; - - public static MockStreamMetadataService createSingletonMetadataServiceWithSample() { - if (mockStreamMetadataServiceInstance != null) { - return mockStreamMetadataServiceInstance; - } - mockStreamMetadataServiceInstance = new MockStreamMetadataService(); - mockStreamMetadataServiceInstance.registerStream("sampleStream", createSampleStreamDefinition("sampleStream")); - mockStreamMetadataServiceInstance.registerStream("sampleStream_1", createSampleStreamDefinition("sampleStream_1")); - mockStreamMetadataServiceInstance.registerStream("sampleStream_2", createSampleStreamDefinition("sampleStream_2")); - mockStreamMetadataServiceInstance.registerStream("sampleStream_3", createSampleStreamDefinition("sampleStream_3")); - mockStreamMetadataServiceInstance.registerStream("sampleStream_4", createSampleStreamDefinition("sampleStream_4")); - return mockStreamMetadataServiceInstance; - } - - public static StreamDefinition createSampleStreamDefinition(String streamId) { - StreamDefinition sampleStreamDefinition = new StreamDefinition(); - sampleStreamDefinition.setStreamId(streamId); - sampleStreamDefinition.setTimeseries(true); - sampleStreamDefinition.setValidate(true); - sampleStreamDefinition.setDescription("Schema for " + streamId); - List<StreamColumn> streamColumns = new ArrayList<>(); - - streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); - streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build()); - streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build()); - streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build()); - streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build()); - sampleStreamDefinition.setColumns(streamColumns); - return sampleStreamDefinition; - } - - /** - * By default window period is: PT1m - * - * @param streamId - * @return - */ - public static StreamSortSpec createSampleStreamSortSpec(String streamId) { - StreamSortSpec streamSortSpec = new StreamSortSpec(); -// streamSortSpec.setColumn("timestamp"); -// streamSortSpec.setStreamId(streamId); - streamSortSpec.setWindowMargin(1000); - streamSortSpec.setWindowPeriod("PT1m"); - return streamSortSpec; - } - - public static StreamSortSpec createSampleStreamSortSpec(String streamId, String period, int margin) { - StreamSortSpec streamSortSpec = new StreamSortSpec(); -// streamSortSpec.setColumn("timestamp"); -// streamSortSpec.setStreamId(streamId); - streamSortSpec.setWindowMargin(margin); - streamSortSpec.setWindowPeriod(period); - return streamSortSpec; - } - - /** - * Policy: from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream; - * - * @return PolicyDefinition[from sampleStream_1[name == "cpu" and value > 50.0] select name, host, flag, value insert into outputStream;] - */ - public static PolicyDefinition createSingleMetricSamplePolicy() { - String definePolicy = "from sampleStream_1[name == \"cpu\" and value > 50.0] select name, host, flag, value insert into outputStream;"; - PolicyDefinition policyDefinition = new PolicyDefinition(); - policyDefinition.setName("SamplePolicyForTest"); - policyDefinition.setInputStreams(Arrays.asList("sampleStream_1")); - policyDefinition.setOutputStreams(Arrays.asList("outputStream")); - policyDefinition.setDefinition(new PolicyDefinition.Definition( - PolicyStreamHandlers.SIDDHI_ENGINE, - definePolicy - )); - policyDefinition.setPartitionSpec(Arrays.asList(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name")))); - return policyDefinition; - } - - public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) { - StreamPartition streamPartition = new StreamPartition(); - streamPartition.setStreamId(streamId); - streamPartition.setColumns(new ArrayList<>(groupByField)); - streamPartition.setType(StreamPartition.Type.GROUPBY); - StreamSortSpec streamSortSpec = new StreamSortSpec(); - streamSortSpec.setWindowPeriod("PT30m"); - streamSortSpec.setWindowMargin(10000); - streamPartition.setSortSpec(streamSortSpec); - return streamPartition; - } - - public static StreamRouterSpec createSampleStreamRouteSpec(String streamId, String groupByField, List<String> targetEvaluatorIds) { - List<WorkSlot> slots = Arrays.asList(targetEvaluatorIds.stream().map((t) -> { - return new WorkSlot("sampleTopology", t); - }).toArray(WorkSlot[]::new)); - StreamRouterSpec streamRouteSpec = new StreamRouterSpec(); - streamRouteSpec.setStreamId(streamId); - streamRouteSpec.setPartition(createSampleStreamGroupbyPartition(streamId, Arrays.asList(groupByField))); - streamRouteSpec.setTargetQueue(Arrays.asList(new PolicyWorkerQueue(slots))); - return streamRouteSpec; - } - - public static StreamRouterSpec createSampleStreamRouteSpec(List<String> targetEvaluatorIds) { - return createSampleStreamRouteSpec("sampleStream_1", "name", targetEvaluatorIds); - } - - /** - * GROUPBY_sampleStream_1_ON_name - * - * @param targetEvaluatorIds - * @return - */ - public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_1_BY_name(List<String> targetEvaluatorIds) { - return createSampleStreamRouteSpec("sampleStream_1", "name", targetEvaluatorIds); - } - - public static StreamRouterSpec createRouteSpec_GROUP_sampleStream_2_BY_name(List<String> targetEvaluatorIds) { - return createSampleStreamRouteSpec("sampleStream_2", "name", targetEvaluatorIds); - } - - public static PartitionedEvent createSimpleStreamEvent() { - StreamEvent event = null; - try { - event = StreamEvent.builder() - .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition("sampleStream_1")) - .streamId("sampleStream_1") - .timestamep(System.currentTimeMillis()) - .attributes(new HashMap<String, Object>() {{ - put("name", "cpu"); - put("value", 60.0); - put("unknown", "unknown column value"); - }}).build(); - } catch (StreamNotDefinedException e) { - e.printStackTrace(); - } - PartitionedEvent pEvent = new PartitionedEvent(); - pEvent.setEvent(event); - return pEvent; - } - - private final static String[] SAMPLE_STREAM_NAME_OPTIONS = new String[] { - "cpu", "memory", "disk", "network" - }; - - private final static String[] SAMPLE_STREAM_HOST_OPTIONS = new String[] { - "localhost_1", "localhost_2", "localhost_3", "localhost_4" - }; - - private final static Boolean[] SAMPLE_STREAM_FLAG_OPTIONS = new Boolean[] { - true, false - }; - - private final static Double[] SAMPLE_STREAM_VALUE_OPTIONS = new Double[] { - -0.20, 40.4, 50.5, 60.6, 10000.1 - }; - private final static String[] SAMPLE_STREAM_ID_OPTIONS = new String[] { - "sampleStream_1", "sampleStream_2", "sampleStream_3", "sampleStream_4", - }; - private final static Random RANDOM = ThreadLocalRandom.current(); - - public static StreamEvent createRandomStreamEvent() { - return createRandomStreamEvent(SAMPLE_STREAM_ID_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_ID_OPTIONS.length)]); - } - - public static StreamEvent createRandomStreamEvent(String streamId) { - return createRandomStreamEvent(streamId, System.currentTimeMillis()); - } - - private final static Long[] TIME_DELTA_OPTIONS = new Long[] { - -30000L, -10000L, -5000L, -1000L, 0L, 1000L, 5000L, 10000L, 30000L - }; - - public static StreamEvent createRandomOutOfTimeOrderStreamEvent(String streamId) { - StreamEvent event = createRandomStreamEvent(streamId); - event.setTimestamp(System.currentTimeMillis() + TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]); - return event; - } - - - public static PartitionedEvent createRandomOutOfTimeOrderEventGroupedByName(String streamId) { - StreamEvent event = createRandomStreamEvent(streamId); - event.setTimestamp(System.currentTimeMillis() + TIME_DELTA_OPTIONS[RANDOM.nextInt(TIME_DELTA_OPTIONS.length)]); - return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode()); - } - - public static PartitionedEvent createPartitionedEventGroupedByName(String streamId, long timestamp) { - StreamEvent event = createRandomStreamEvent(streamId); - event.setTimestamp(timestamp); - return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode()); - } - - public static PartitionedEvent createRandomSortedEventGroupedByName(String streamId) { - StreamEvent event = createRandomStreamEvent(streamId); - event.setTimestamp(System.currentTimeMillis()); - return new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode()); - } - - public static StreamEvent createRandomStreamEvent(String streamId, long timestamp) { - StreamEvent event; - try { - event = StreamEvent.builder() - .schema(MockSampleMetadataFactory.createSingletonMetadataServiceWithSample().getStreamDefinition(streamId)) - .streamId(streamId) - .timestamep(timestamp) - .attributes(new HashMap<String, Object>() {{ - put("name", SAMPLE_STREAM_NAME_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_NAME_OPTIONS.length)]); - put("value", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]); - put("host", SAMPLE_STREAM_HOST_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_HOST_OPTIONS.length)]); - put("flag", SAMPLE_STREAM_FLAG_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_FLAG_OPTIONS.length)]); -// put("value1", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]); -// put("value2", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]); -// put("value3", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]); -// put("value4", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]); -// put("value5", SAMPLE_STREAM_VALUE_OPTIONS[RANDOM.nextInt(SAMPLE_STREAM_VALUE_OPTIONS.length)]); - put("unknown", "unknown column value"); - }}).build(); - } catch (StreamNotDefinedException e) { - throw new IllegalStateException(e.getMessage(), e); - } - return event; - } - - public static PartitionedEvent createRandomPartitionedEvent(String streamId, long timestamp) { - StreamEvent event = createRandomStreamEvent(streamId, timestamp); - PartitionedEvent partitionedEvent = new PartitionedEvent(event, createSampleStreamGroupbyPartition(streamId, Arrays.asList("name")), event.getData()[0].hashCode()); - return partitionedEvent; - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java deleted file mode 100755 index b865422..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamCollector.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.mock; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; - -public class MockStreamCollector implements Collector<AlertStreamEvent> { - @SuppressWarnings("unused") - private final static Logger LOG = LoggerFactory.getLogger(MockStreamCollector.class); - private List<AlertStreamEvent> cache; - - public MockStreamCollector() { - cache = new LinkedList<>(); - } - - public void emit(AlertStreamEvent event) { - cache.add(event); - // LOG.info("PartitionedEventCollector received: {}",event); - } - - public void clear() { - cache.clear(); - } - - public List<AlertStreamEvent> get() { - return cache; - } - - public int size() { - return cache.size(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java deleted file mode 100644 index 73c39c4..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamMetadataService.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.mock; - -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamNotDefinedException; - -import java.util.HashMap; -import java.util.Map; - -public class MockStreamMetadataService { - private final Map<String, StreamDefinition> streamSchemaMap = new HashMap<>(); - - public StreamDefinition getStreamDefinition(String streamId) throws StreamNotDefinedException { - if (streamSchemaMap.containsKey(streamId)) { - return streamSchemaMap.get(streamId); - } else { - throw new StreamNotDefinedException(streamId); - } - } - - public void registerStream(String streamId, StreamDefinition schema) { - streamSchemaMap.put(streamId, schema); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java deleted file mode 100644 index 9ab4c24..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/mock/MockStreamReceiver.java +++ /dev/null @@ -1,80 +0,0 @@ -package org.apache.eagle.alert.engine.mock; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.utils.AlertConstants; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -@SuppressWarnings("serial") -public class MockStreamReceiver extends BaseRichSpout { - private final static Logger LOG = LoggerFactory.getLogger(MockStreamReceiver.class); - private SpoutOutputCollector collector; - private List<String> outputStreamIds; - - public MockStreamReceiver(int partition) { - outputStreamIds = new ArrayList<>(partition); - for (int i = 0; i < partition; i++) { - outputStreamIds.add(StreamIdConversion.generateStreamIdByPartition(i)); - } - } - - @SuppressWarnings("rawtypes") - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - } - - @Override - public void close() { - } - - /** - * This unit test is not to mock the end2end logic of correlation spout, - * but simply generate some sample data for following bolts testing - */ - @Override - public void nextTuple() { - PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1"); - LOG.info("Receive {}", event); - collector.emit(outputStreamIds.get( - // group by the first field in event i.e. name - (int) (event.getPartitionKey() % outputStreamIds.size())), - Collections.singletonList(event)); - Utils.sleep(500); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (String streamId : outputStreamIds) { - declarer.declareStream(streamId, new Fields(AlertConstants.FIELD_0)); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java deleted file mode 100644 index 0446b5e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeBatchWindow.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.nodata; - -import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeBatchWindow; -import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.*; - -public class TestDistinctValuesInTimeBatchWindow { - - private static final String inputStream = "testInputStream"; - - private NoDataPolicyTimeBatchHandler handler; - - @Before - public void setup() { - handler = mock(NoDataPolicyTimeBatchHandler.class); - } - - @After - public void teardown() { - } - - @Test @Ignore - public void testNormal() throws Exception { - // wisb is null since it is dynamic mode - DistinctValuesInTimeBatchWindow window = new DistinctValuesInTimeBatchWindow(handler, 5 * 1000, null); - - long now = System.currentTimeMillis(); - - // handler.compareAndEmit(anyObject(), anyObject(), anyObject()); - - // event time - sendEventToWindow(window, now, "host1", 95.5); - - Thread.sleep(6000); - - sendEventToWindow(window, now, "host1", 91.0); - sendEventToWindow(window, now, "host2", 95.5); - sendEventToWindow(window, now, "host2", 97.1); - - Thread.sleep(3000); - - sendEventToWindow(window, now, "host1", 90.7); - - Thread.sleep(4000); - - sendEventToWindow(window, now, "host1", 90.7); - - Thread.sleep(3000); - - verify(handler, times(3)).compareAndEmit(anyObject(), anyObject(), anyObject()); - } - - private void sendEventToWindow(DistinctValuesInTimeBatchWindow window, long ts, String host, double value) { - window.send(buildStreamEvent(ts, host, value), host, ts); - } - - private StreamEvent buildStreamEvent(long ts, String host, double value) { - StreamEvent e = new StreamEvent(); - e.setData(new Object[] {ts, host, value}); - e.setStreamId(inputStream); - e.setTimestamp(ts); - return e; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java deleted file mode 100644 index d23abcb..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestDistinctValuesInTimeWindow.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.nodata; - -import org.apache.eagle.alert.engine.evaluator.nodata.DistinctValuesInTimeWindow; -import org.junit.Test; - -import java.util.Iterator; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -/** - * Since 6/28/16. - */ -public class TestDistinctValuesInTimeWindow { - @Test - public void test() { - DistinctValuesInTimeWindow window = new DistinctValuesInTimeWindow(60 * 1000); - window.send("1", 0); - window.send("2", 1000); - window.send("3", 1000); - window.send("1", 30000); - window.send("2", 50000); - window.send("1", 62000); - Map<Object, Long> values = window.distinctValues(); - System.out.println(values); - } - - @Test - public void testSort() { - SortedMap<DistinctValuesInTimeWindow.ValueAndTime, DistinctValuesInTimeWindow.ValueAndTime> timeSortedMap = - new TreeMap<>(new DistinctValuesInTimeWindow.ValueAndTimeComparator()); - DistinctValuesInTimeWindow.ValueAndTime vt1 = new DistinctValuesInTimeWindow.ValueAndTime("1", 0); - timeSortedMap.put(vt1, vt1); - DistinctValuesInTimeWindow.ValueAndTime vt2 = new DistinctValuesInTimeWindow.ValueAndTime("2", 1000); - timeSortedMap.put(vt2, vt2); - DistinctValuesInTimeWindow.ValueAndTime vt3 = new DistinctValuesInTimeWindow.ValueAndTime("3", 1000); - timeSortedMap.put(vt3, vt3); - timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("1", 0)); - DistinctValuesInTimeWindow.ValueAndTime vt4 = new DistinctValuesInTimeWindow.ValueAndTime("1", 30000); - timeSortedMap.put(vt4, vt4); - Iterator<?> it = timeSortedMap.entrySet().iterator(); - while (it.hasNext()) { - System.out.println(it.next()); - } - timeSortedMap.remove(new DistinctValuesInTimeWindow.ValueAndTime("2", 1000)); - DistinctValuesInTimeWindow.ValueAndTime vt5 = new DistinctValuesInTimeWindow.ValueAndTime("2", 50000); - timeSortedMap.put(vt5, vt5); - DistinctValuesInTimeWindow.ValueAndTime vt6 = new DistinctValuesInTimeWindow.ValueAndTime("1", 62000); - timeSortedMap.put(vt6, vt6); - it = timeSortedMap.entrySet().iterator(); - while (it.hasNext()) { - System.out.println(it.next()); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java deleted file mode 100644 index 79b939c..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestEventTable.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.nodata; - -import org.junit.Test; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.output.StreamCallback; -import org.wso2.siddhi.core.util.EventPrinter; - -/** - * Since 6/27/16. - */ -public class TestEventTable { - @Test - public void test() throws Exception { - ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( - "define stream expectStream (key string, src string);" + - "define stream appearStream (key string, src string);" + - "define table expectTable (key string, src string);" + - "from expectStream insert into expectTable;" + - "from appearStream[(expectTable.key==key) in expectTable] insert into outputStream;" - ); - - runtime.addCallback("outputStream", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - } - }); - - runtime.start(); - runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {"host1", "expectStream"}); - Thread.sleep(2000); - runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[] {"host2", "expectStream"}); - Thread.sleep(2000); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java deleted file mode 100644 index fe70630..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataAlert.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.nodata; - -import org.junit.Test; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.output.StreamCallback; -import org.wso2.siddhi.core.util.EventPrinter; - -/** - * Since 6/27/16. - */ -public class TestNoDataAlert { - @Test - public void test() throws Exception { - String[] expectHosts = new String[] {"host_1", "host_2", "host_3", "host_4", "host_5", "host_6", "host_7", "host_8"}; -// String[] appearHosts = new String[]{"host_6","host_7","host_8"}; -// String[] noDataHosts = new String[]{"host_1","host_2","host_3","host_4","host_5"}; - - ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( - "define stream appearStream (key string, src string);" + - "define stream expectStream (key string, src string);" + - "define table expectTable (key string, src string);" + - "define trigger fiveSecTriggerStream at every 1 sec;" + - "define trigger initAppearTriggerStream at 'start';" + - "from expectStream insert into expectTable;" + - "from fiveSecTriggerStream join expectTable insert into triggerExpectStream;" + - "from initAppearTriggerStream join expectTable insert into initAppearStream;" -// "from triggerExpectStream as l left outer join appearStream#window.time(5 sec) as r on l.key == r.key select l.key as k1,r.key as k2 insert current events into joinStream;" + -// "from joinStream[k2 is null] select k1 insert current events into missingStream;" - ); - -// ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( -// "define stream appearStream (key string, src string);"+ -// "define stream expectStream (key string, src string);"+ -// "define table expectTable (key string, src string);"+ -// "from expectStream insert into expectTable;"+ -// "from appearStream#window.time(10 sec) as l right outer join expectTable as r on l.key == r.key select r.key as k2, l.key as k1 insert current events into joinStream;" + -// "from joinStream[k1 is null] select k2 insert current events into missingStream;" -//// "from joinStream insert into missingStream;" -// -// ); - - runtime.addCallback("initAppearStream", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - } - }); - - runtime.start(); - for (String host : expectHosts) { - runtime.getInputHandler("expectStream").send(System.currentTimeMillis(), new Object[] {host, "expectStream"}); - } - -// for(String host:appearHosts) { -// runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"}); -// } - - Thread.sleep(5000); - -// for(String host:appearHosts) { -// runtime.getInputHandler("appearStream").send(System.currentTimeMillis(), new Object[]{host,"inStream"}); -// } -// Thread.sleep(10000); - } - - /** - * only alert when the successive 2 events has number of missing blocks changed - * from every a = hadoopJmxMetricEventStream[ component=="namenode" and metric == "hadoop.namenode.dfs.missingblocks"] -> b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and convert(b.value, "long") > convert(a.value, "long") ] select b.metric as metric, b.host as host, b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, b.site as site insert into tmp; - */ - @Test - public void testMissingBlock() throws Exception { - ExecutionPlanRuntime runtime = new SiddhiManager().createExecutionPlanRuntime( - "define stream hadoopJmxMetricEventStream (component string, metric string, host string, site string, value double, timestamp long);" + - "from every a = hadoopJmxMetricEventStream[ component==\"namenode\" and metric == \"hadoop.namenode.dfs.missingblocks\"] -> " + - "b = hadoopJmxMetricEventStream[b.component==a.component and b.metric==a.metric and b.host==a.host and " + - "convert(b.value, \"long\") > convert(a.value, \"long\") ] select b.metric as metric, b.host as host, " + - "b.value as newNumOfMissingBlocks, a.value as oldNumOfMissingBlocks, b.timestamp as timestamp, b.component as component, " + - "b.site as site insert into outputStream;" - ); - - runtime.addCallback("outputStream", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - } - }); - - runtime.start(); - runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 12.0, 123000L}); - runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 13.0, 123100L}); - runtime.getInputHandler("hadoopJmxMetricEventStream").send(System.currentTimeMillis(), new Object[] {"namenode", "hadoop.namenode.dfs.missingblocks", "host1", "site1", 16.0, 123200L}); - - - Thread.sleep(5000); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java deleted file mode 100644 index 5564b90..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyHandler.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.nodata; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Since 6/29/16. - */ -public class TestNoDataPolicyHandler { - private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyHandler.class); - private static final String inputStream = "testInputStream"; - private static final String outputStream = "testOutputStream"; - - @Test - public void test() throws Exception { - test(buildPolicyDef_provided()); - test(buildPolicyDef_dynamic()); - } - - @SuppressWarnings("unchecked") - public void test(PolicyDefinition pd) throws Exception { - Map<String, StreamDefinition> sds = new HashMap<>(); - StreamDefinition sd = buildStreamDef(); - sds.put("testInputStream", sd); - NoDataPolicyHandler handler = new NoDataPolicyHandler(sds); - - PolicyHandlerContext context = new PolicyHandlerContext(); - context.setPolicyDefinition(pd); - handler.prepare(new TestCollector(), context); - - handler.send(buildStreamEvt(0, "host1", 12.5)); - handler.send(buildStreamEvt(0, "host2", 12.6)); - handler.send(buildStreamEvt(100, "host1", 20.9)); - handler.send(buildStreamEvt(120, "host2", 22.1)); - handler.send(buildStreamEvt(4000, "host2", 22.1)); - handler.send(buildStreamEvt(50000, "host2", 22.1)); - handler.send(buildStreamEvt(60150, "host2", 22.3)); - handler.send(buildStreamEvt(60450, "host2", 22.9)); - handler.send(buildStreamEvt(75000, "host1", 41.6)); - handler.send(buildStreamEvt(85000, "host2", 45.6)); - } - - @SuppressWarnings("rawtypes") - private static class TestCollector implements Collector { - @Override - public void emit(Object o) { - AlertStreamEvent e = (AlertStreamEvent) o; - Object[] data = e.getData(); - Assert.assertEquals("host2", data[1]); - LOG.info(e.toString()); - } - } - - private PolicyDefinition buildPolicyDef_provided() { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("PT1M,provided,1,host,host1,host2"); - def.setType("nodataalert"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList(inputStream)); - pd.setOutputStreams(Arrays.asList(outputStream)); - pd.setName("nodataalert-test"); - return pd; - } - - private PolicyDefinition buildPolicyDef_dynamic() { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("PT1M,dynamic,1,host"); - def.setType("nodataalert"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList(inputStream)); - pd.setOutputStreams(Arrays.asList(outputStream)); - pd.setName("nodataalert-test"); - return pd; - } - - private StreamDefinition buildStreamDef() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn valueColumn = new StreamColumn(); - valueColumn.setName("value"); - valueColumn.setType(StreamColumn.Type.DOUBLE); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); - sd.setDataSource("testDataSource"); - sd.setStreamId("testStreamId"); - return sd; - } - - private StreamEvent buildStreamEvt(long ts, String host, double value) { - StreamEvent e = new StreamEvent(); - e.setData(new Object[] {ts, host, value}); - e.setStreamId(inputStream); - e.setTimestamp(ts); - return e; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java deleted file mode 100644 index 334db29..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/nodata/TestNoDataPolicyTimeBatchHandler.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.nodata; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.eagle.alert.engine.Collector; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext; -import org.apache.eagle.alert.engine.evaluator.nodata.NoDataPolicyTimeBatchHandler; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestNoDataPolicyTimeBatchHandler { - - private static final Logger LOG = LoggerFactory.getLogger(TestNoDataPolicyTimeBatchHandler.class); - - private static final String inputStream = "testInputStream"; - private static final String outputStream = "testOutputStream"; - - @Before - public void setup() { - } - - @SuppressWarnings("unchecked") - @Test - public void testDynamic1() throws Exception { - Map<String, StreamDefinition> sds = new HashMap<>(); - sds.put("testInputStream", buildStreamDef()); - sds.put("testOutputStream", buildOutputStreamDef()); - NoDataPolicyTimeBatchHandler handler = new NoDataPolicyTimeBatchHandler(sds); - - PolicyHandlerContext context = new PolicyHandlerContext(); - context.setPolicyDefinition(buildPolicyDef_dynamic()); - handler.prepare(new TestCollector(), context); - - long now = System.currentTimeMillis(); - - handler.send(buildStreamEvt(now, "host1", 12.5)); - - Thread.sleep(2000); - - handler.send(buildStreamEvt(now, "host2", 12.6)); - handler.send(buildStreamEvt(now, "host1", 20.9)); - handler.send(buildStreamEvt(now, "host2", 22.1)); - handler.send(buildStreamEvt(now, "host2", 22.1)); - - Thread.sleep(5000); - - handler.send(buildStreamEvt(now, "host2", 22.1)); - handler.send(buildStreamEvt(now, "host2", 22.3)); - - Thread.sleep(5000); - - handler.send(buildStreamEvt(now, "host2", 22.9)); - handler.send(buildStreamEvt(now, "host1", 41.6)); - handler.send(buildStreamEvt(now, "host2", 45.6)); - - Thread.sleep(1000); - } - - @SuppressWarnings("rawtypes") - private static class TestCollector implements Collector { - @Override - public void emit(Object o) { - AlertStreamEvent e = (AlertStreamEvent) o; - Object[] data = e.getData(); - - LOG.info("alert data: {}, {}", data[1], data[0]); - } - } - - private PolicyDefinition buildPolicyDef_dynamic() { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - def.setValue("PT5S,dynamic"); - def.setType("nodataalert"); - Map<String, Object> properties = new HashMap<String, Object>(); - properties.put("nodataColumnName", "host"); - def.setProperties(properties); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList(inputStream)); - pd.setOutputStreams(Arrays.asList(outputStream)); - pd.setName("nodataalert-test"); - return pd; - } - - private StreamDefinition buildStreamDef() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn valueColumn = new StreamColumn(); - valueColumn.setName("value"); - valueColumn.setType(StreamColumn.Type.DOUBLE); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); - sd.setDataSource("testDataSource"); - sd.setStreamId("testInputStream"); - return sd; - } - - private StreamDefinition buildOutputStreamDef() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn valueColumn = new StreamColumn(); - valueColumn.setName("originalStreamName"); - valueColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn)); - sd.setDataSource("testDataSource"); - sd.setStreamId("testOutputStream"); - return sd; - } - - private StreamEvent buildStreamEvt(long ts, String host, double value) { - StreamEvent e = new StreamEvent(); - e.setData(new Object[] {ts, host, value}); - e.setStreamId(inputStream); - e.setTimestamp(ts); - return e; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java deleted file mode 100644 index 82d8c99..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/perf/TestSerDeserPer.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.perf; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; - -/** - * Since 5/13/16. - */ -public class TestSerDeserPer { - @Rule - public TemporaryFolder temporaryFolder = new TemporaryFolder(); - - Object[] data = null; - - @Before - public void before() { - int max = 100; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < max; i++) { - sb.append("a"); - } - data = new Object[] {sb.toString()}; - } - - @Test - public void testSerDeserPerf() throws Exception { - Kryo kryo = new Kryo(); - String outputPath = temporaryFolder.newFile().toString(); - Output output = new Output(new FileOutputStream(outputPath)); - for (int i = 0; i < 1000; i++) { - kryo.writeObject(output, constructPE()); - } - output.close(); - Input input = new Input(new FileInputStream(outputPath)); - PartitionedEvent someObject = kryo.readObject(input, PartitionedEvent.class); - input.close(); - Assert.assertTrue(someObject.getData().length == 1); - } - - private PartitionedEvent constructPE() { - StreamEvent e = new StreamEvent(); - e.setStreamId("testStreamId"); - e.setTimestamp(1463159382000L); - e.setData(data); - StreamPartition sp = new StreamPartition(); - List<String> col = new ArrayList<>(); - col.add("host"); - sp.setColumns(col); - StreamSortSpec sortSpec = new StreamSortSpec(); - sortSpec.setWindowMargin(30000); - sortSpec.setWindowPeriod("PT1M"); - sp.setSortSpec(sortSpec); - sp.setStreamId("testStreamId"); - sp.setType(StreamPartition.Type.GROUPBY); - PartitionedEvent pe = new PartitionedEvent(); - pe.setEvent(e); - pe.setPartition(sp); - pe.setPartitionKey(1000); - return pe; - } - - @Test - public void testSerDeserPerf2() throws Exception { - Kryo kryo = new Kryo(); - String outputPath = temporaryFolder.newFile().toString(); - Output output = new Output(new FileOutputStream(outputPath)); - for (int i = 0; i < 1000; i++) { - kryo.writeObject(output, constructNewPE()); - } - output.close(); - Input input = new Input(new FileInputStream(outputPath)); - NewPartitionedEvent someObject = kryo.readObject(input, NewPartitionedEvent.class); - input.close(); - Assert.assertTrue(someObject.getData().length == 1); - } - - private NewPartitionedEvent constructNewPE() { - NewPartitionedEvent pe = new NewPartitionedEvent(); - pe.setStreamId("testStreamId"); - pe.setTimestamp(1463159382000L); - pe.setData(data); - - pe.setType(StreamPartition.Type.GROUPBY); - List<String> col = new ArrayList<>(); - col.add("host"); - pe.setColumns(col); - pe.setPartitionKey(1000); - - pe.setWindowMargin(30000); - pe.setWindowPeriod("PT1M"); - return pe; - } - - @Test - public void testSerDeserPerf3() throws Exception { - Kryo kryo = new Kryo(); - String outputPath = temporaryFolder.newFile().toString(); - Output output = new Output(new FileOutputStream(outputPath)); - for (int i = 0; i < 1000; i++) { - kryo.writeObject(output, constructNewPE2()); - } - output.close(); - Input input = new Input(new FileInputStream(outputPath)); - NewPartitionedEvent2 someObject = kryo.readObject(input, NewPartitionedEvent2.class); - input.close(); - Assert.assertTrue(someObject.getData().length == 1); - } - - private NewPartitionedEvent2 constructNewPE2() { - NewPartitionedEvent2 pe = new NewPartitionedEvent2(); - pe.setStreamId(100); - pe.setTimestamp(1463159382000L); - pe.setData(data); - - pe.setType(1); - int[] col = new int[1]; - col[0] = 1; - pe.setColumns(col); - pe.setPartitionKey(1000); - - pe.setWindowMargin(30000); - pe.setWindowPeriod(60); - return pe; - } - - public static class NewPartitionedEvent implements Serializable { - private static final long serialVersionUID = -3840016190614238593L; - // basic - private String streamId; - private long timestamp; - private Object[] data; - - // stream partition - private StreamPartition.Type type; - private List<String> columns = new ArrayList<>(); - private long partitionKey; - - // sort spec - private String windowPeriod = ""; - private long windowMargin = 30 * 1000; - - public NewPartitionedEvent() { - } - - public String getStreamId() { - return streamId; - } - - public void setStreamId(String streamId) { - this.streamId = streamId; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public Object[] getData() { - return data; - } - - public void setData(Object[] data) { - this.data = data; - } - - public StreamPartition.Type getType() { - return type; - } - - public void setType(StreamPartition.Type type) { - this.type = type; - } - - public List<String> getColumns() { - return columns; - } - - public void setColumns(List<String> columns) { - this.columns = columns; - } - - public long getPartitionKey() { - return partitionKey; - } - - public void setPartitionKey(long partitionKey) { - this.partitionKey = partitionKey; - } - - public String getWindowPeriod() { - return windowPeriod; - } - - public void setWindowPeriod(String windowPeriod) { - this.windowPeriod = windowPeriod; - } - - public long getWindowMargin() { - return windowMargin; - } - - public void setWindowMargin(long windowMargin) { - this.windowMargin = windowMargin; - } - } - - public static class NewPartitionedEvent2 implements Serializable { - private static final long serialVersionUID = -3840016190614238593L; - // basic - private int streamId; - private long timestamp; - private Object[] data; - - // stream partition - private int type; - private int[] columns; - private long partitionKey; - - // sort spec - private long windowPeriod; - private long windowMargin = 30 * 1000; - - public NewPartitionedEvent2() { - } - - public int getStreamId() { - return streamId; - } - - public void setStreamId(int streamId) { - this.streamId = streamId; - } - - public int getType() { - return type; - } - - public void setType(int type) { - this.type = type; - } - - public int[] getColumns() { - return columns; - } - - public void setColumns(int[] columns) { - this.columns = columns; - } - - public long getPartitionKey() { - return partitionKey; - } - - public void setPartitionKey(long partitionKey) { - this.partitionKey = partitionKey; - } - - public long getWindowPeriod() { - return windowPeriod; - } - - public void setWindowPeriod(long windowPeriod) { - this.windowPeriod = windowPeriod; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public Object[] getData() { - return data; - } - - public void setData(Object[] data) { - this.data = data; - } - - public long getWindowMargin() { - return windowMargin; - } - - public void setWindowMargin(long windowMargin) { - this.windowMargin = windowMargin; - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java deleted file mode 100644 index 50fb07d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertEmailPublisherTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher; - -import com.dumbster.smtp.SimpleSmtpServer; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.publisher.email.AlertEmailConstants; -import org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -public class AlertEmailPublisherTest { - private static final String EMAIL_PUBLISHER_TEST_POLICY = "Test Policy Alert"; - private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPublisherTest.class); - private static final int SMTP_PORT = 5025; - private Config config; - private SimpleSmtpServer server; - - @Before - public void setUp() { - config = ConfigFactory.load("application-test.conf"); - server = SimpleSmtpServer.start(SMTP_PORT); - } - - @After - public void clear() { - if (server != null) { - server.stop(); - } - } - - @Test - public void testAlertEmailPublisher() throws Exception { - AlertEmailPublisher publisher = new AlertEmailPublisher(); - Map<String, Object> properties = new HashMap<>(); - properties.put(PublishConstants.SUBJECT, EMAIL_PUBLISHER_TEST_POLICY); - properties.put(PublishConstants.SENDER, "eagle@localhost"); - properties.put(PublishConstants.RECIPIENTS, "somebody@localhost"); - Publishment publishment = new Publishment(); - publishment.setName("testEmailPublishment"); - publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName()); - publishment.setPolicyIds(Collections.singletonList(EMAIL_PUBLISHER_TEST_POLICY)); - publishment.setDedupIntervalMin("PT0M"); - publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName()); - publishment.setProperties(properties); - Map<String, String> conf = new HashMap<>(); - publisher.init(config, publishment, conf); - publisher.onAlert(AlertPublisherTestHelper.mockEvent(EMAIL_PUBLISHER_TEST_POLICY)); - Assert.assertEquals(1, server.getReceivedEmailSize()); - Assert.assertTrue(server.getReceivedEmail().hasNext()); - LOG.info("EMAIL:\n {}", server.getReceivedEmail().next()); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java deleted file mode 100644 index 33cb103..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertFilePublisherTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.publisher; - -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.AlertFilePublisher; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public class AlertFilePublisherTest { - private static final String TEST_POLICY_ID = "testPolicy"; - - @Test - public void testAlertFilePublisher() throws Exception { - Map<String, Object> properties = new HashMap<>(); - properties.put(PublishConstants.ROTATE_EVERY_KB, 1); - properties.put(PublishConstants.NUMBER_OF_FILES, 1); - - String property = "java.io.tmpdir"; - String tempDir = System.getProperty(property); - System.out.println("OS current temporary directory is " + tempDir); - - //properties.put(PublishConstants.FILE_NAME, tempDir+"eagle-alert.log"); - - Publishment publishment = new Publishment(); - publishment.setName("testFilePublishment"); - publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName()); - publishment.setPolicyIds(Arrays.asList(TEST_POLICY_ID)); - publishment.setDedupIntervalMin("PT0M"); - publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName()); - publishment.setProperties(properties); - - AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID); - - AlertFilePublisher publisher = new AlertFilePublisher(); - publisher.init(null, publishment, null); - - publisher.onAlert(event); - publisher.close(); - - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java deleted file mode 100644 index ddf2001..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertKafkaPublisherTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher; - -import java.util.*; - -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.dedup.DedupCache; -import org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher; -import org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer; -import org.apache.eagle.alert.utils.KafkaEmbedded; -import org.junit.*; - -import com.google.common.collect.ImmutableMap; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; - -public class AlertKafkaPublisherTest { - - private static final String TEST_TOPIC_NAME = "test"; - private static final String TEST_POLICY_ID = "testPolicy"; - private static final int TEST_KAFKA_BROKER_PORT = 59092; - private static final int TEST_KAFKA_ZOOKEEPER_PORT = 52181; - private static KafkaEmbedded kafka; - private static Config config; - - private static List<String> outputMessages = new ArrayList<String>(); - - @BeforeClass - public static void setup() { - kafka = new KafkaEmbedded(TEST_KAFKA_BROKER_PORT, TEST_KAFKA_ZOOKEEPER_PORT); - System.setProperty("config.resource", "/simple/application-integration.conf"); - config = ConfigFactory.load(); - consumeWithOutput(outputMessages); - } - - @AfterClass - public static void end() { - if (kafka != null) { - kafka.shutdown(); - } - } - - @Test @Ignore - public void testAsync() throws Exception { - AlertKafkaPublisher publisher = new AlertKafkaPublisher(); - Map<String, Object> properties = new HashMap<>(); - properties.put(PublishConstants.BROKER_LIST, "localhost:" + TEST_KAFKA_BROKER_PORT); - properties.put(PublishConstants.TOPIC, TEST_TOPIC_NAME); - - List<Map<String, Object>> kafkaClientConfig = new ArrayList<Map<String, Object>>(); - kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "async")); - kafkaClientConfig.add(ImmutableMap.of("name", "batch.num.messages", "value", 3000)); - kafkaClientConfig.add(ImmutableMap.of("name", "queue.buffering.max.ms", "value", 5000)); - kafkaClientConfig.add(ImmutableMap.of("name", "queue.buffering.max.messages", "value", 10000)); - properties.put("kafka_client_config", kafkaClientConfig); - - Publishment publishment = new Publishment(); - publishment.setName("testAsyncPublishment"); - publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName()); - publishment.setPolicyIds(Arrays.asList(TEST_POLICY_ID)); - publishment.setDedupIntervalMin("PT0M"); - publishment.setSerializer(org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer.class.getName()); - publishment.setProperties(properties); - - Map<String, String> conf = new HashMap<String, String>(); - publisher.init(config, publishment, conf); - - AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID); - - outputMessages.clear(); - - publisher.onAlert(event); - Thread.sleep(3000); - Assert.assertEquals(1, outputMessages.size()); - publisher.close(); - } - - @Test @Ignore - public void testSync() throws Exception { - AlertKafkaPublisher publisher = new AlertKafkaPublisher(); - Map<String, Object> properties = new HashMap<>(); - properties.put(PublishConstants.BROKER_LIST, "localhost:" + TEST_KAFKA_BROKER_PORT); - properties.put(PublishConstants.TOPIC, TEST_TOPIC_NAME); - List<Map<String, Object>> kafkaClientConfig = new ArrayList<Map<String, Object>>(); - kafkaClientConfig.add(ImmutableMap.of("name", "producer.type", "value", "sync")); - properties.put("kafka_client_config", kafkaClientConfig); - Publishment publishment = new Publishment(); - publishment.setName("testAsyncPublishment"); - publishment.setType(org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher.class.getName()); - publishment.setPolicyIds(Collections.singletonList(TEST_POLICY_ID)); - publishment.setDedupIntervalMin("PT0M"); - publishment.setSerializer(JsonEventSerializer.class.getName()); - publishment.setProperties(properties); - Map<String, String> conf = new HashMap<>(); - publisher.init(config, publishment, conf); - AlertStreamEvent event = AlertPublisherTestHelper.mockEvent(TEST_POLICY_ID); - outputMessages.clear(); - publisher.onAlert(event); - Thread.sleep(3000); - Assert.assertEquals(1, outputMessages.size()); - publisher.close(); - } - - private static void consumeWithOutput(final List<String> outputMessages) { - Thread t = new Thread(new Runnable() { - @Override - public void run() { - Properties props = new Properties(); - props.put("group.id", "B"); - props.put("zookeeper.connect", "127.0.0.1:" + + TEST_KAFKA_ZOOKEEPER_PORT); - props.put("zookeeper.session.timeout.ms", "4000"); - props.put("zookeeper.sync.time.ms", "2000"); - props.put("auto.commit.interval.ms", "1000"); - props.put("auto.offset.reset", "smallest"); - - ConsumerConnector jcc = null; - try { - ConsumerConfig ccfg = new ConsumerConfig(props); - jcc = Consumer.createJavaConsumerConnector(ccfg); - Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - topicCountMap.put(TEST_TOPIC_NAME, 1); - Map<String, List<KafkaStream<byte[], byte[]>>> topicMap = jcc.createMessageStreams(topicCountMap); - KafkaStream<byte[], byte[]> cstrm = topicMap.get(TEST_TOPIC_NAME).get(0); - for (MessageAndMetadata<byte[], byte[]> mm : cstrm) { - String message = new String(mm.message()); - outputMessages.add(message); - - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - } - } - } finally { - if (jcc != null) { - jcc.shutdown(); - } - } - } - }); - t.start(); - } - - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java deleted file mode 100644 index bd06c9b..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.publisher; - -import org.apache.eagle.alert.engine.coordinator.*; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.dedup.DedupCache; -import org.junit.Assert; - -import java.util.Arrays; -import java.util.HashMap; - -public class AlertPublisherTestHelper { - - public static AlertStreamEvent mockEvent(String policyId){ - StreamDefinition stream = createStream(); - PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), policyId); - return createEvent(stream, policy, - new Object[] {System.currentTimeMillis(), "host1", "testPolicy-host1-01", "open", 0, 0}); - } - - public static AlertStreamEvent createEvent(StreamDefinition stream, PolicyDefinition policy, Object[] data) { - AlertStreamEvent event = new AlertStreamEvent(); - event.setPolicyId(policy.getName()); - event.setSchema(stream); - event.setStreamId(stream.getStreamId()); - event.setTimestamp(System.currentTimeMillis()); - event.setCreatedTime(System.currentTimeMillis()); - event.setSubject("Namenode Disk Used 98%"); - event.setBody("Disk Usage of Test cluster's name node (<a href=\"#\">namenode.hostname.domain</a>) is <strong style=\"color: red\">98%</strong> at <strong>2016-11-30 12:30:45</strong>, exceeding alert threshold <strong>90</strong>%"); - event.setData(data); - event.ensureAlertId(); - event.setSeverity(AlertSeverity.CRITICAL); - event.setCategory("HDFS"); - event.setContext(new HashMap<String,Object>(){{ - put(AlertPublishEvent.SITE_ID_KEY,"TestCluster"); - }}); - Assert.assertNotNull(event.getAlertId()); - return event; - } - - public static StreamDefinition createStream() { - StreamDefinition sd = new StreamDefinition(); - StreamColumn tsColumn = new StreamColumn(); - tsColumn.setName("timestamp"); - tsColumn.setType(StreamColumn.Type.LONG); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("host"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn alertKeyColumn = new StreamColumn(); - alertKeyColumn.setName("alertKey"); - alertKeyColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - // dedupCount, dedupFirstOccurrence - - StreamColumn dedupCountColumn = new StreamColumn(); - dedupCountColumn.setName("dedupCount"); - dedupCountColumn.setType(StreamColumn.Type.LONG); - - StreamColumn dedupFirstOccurrenceColumn = new StreamColumn(); - dedupFirstOccurrenceColumn.setName(DedupCache.DEDUP_FIRST_OCCURRENCE); - dedupFirstOccurrenceColumn.setType(StreamColumn.Type.LONG); - - sd.setColumns(Arrays.asList(tsColumn, hostColumn, alertKeyColumn, stateColumn, dedupCountColumn, - dedupFirstOccurrenceColumn)); - sd.setDataSource("testDatasource"); - sd.setStreamId("testStream"); - sd.setDescription("test stream"); - return sd; - } - - public static PolicyDefinition createPolicyGroupByStreamId(String streamName, String policyName) { - PolicyDefinition pd = new PolicyDefinition(); - PolicyDefinition.Definition def = new PolicyDefinition.Definition(); - // expression, something like "PT5S,dynamic,1,host" - def.setValue("test"); - def.setType("siddhi"); - pd.setDefinition(def); - pd.setInputStreams(Arrays.asList("inputStream")); - pd.setOutputStreams(Arrays.asList("outputStream")); - pd.setName(policyName); - pd.setDescription(String.format("Test policy for stream %s", streamName)); - - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamName); - sp.setColumns(Arrays.asList("host")); - sp.setType(StreamPartition.Type.GROUPBY); - pd.addPartition(sp); - return pd; - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java deleted file mode 100644 index 3df5fc8..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/PublishementTypeLoaderTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.alert.engine.publisher; - -import org.junit.Test; - -public class PublishementTypeLoaderTest { - @Test - public void testPublishmentTypeLoader() { - PublishementTypeLoader.loadPublishmentTypes(); - } -}
