http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java deleted file mode 100644 index 46517fe..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.router; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; -import org.apache.eagle.alert.engine.publisher.AlertPublisher; -import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory; -import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; -import org.apache.eagle.alert.engine.runner.AlertPublisherBolt; -import org.apache.eagle.alert.engine.runner.MapComparator; -import org.apache.eagle.alert.engine.utils.MetadataSerDeser; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; - -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.CollectionType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -/** - * @Since 5/14/16. - */ -public class TestAlertPublisherBolt { - - @SuppressWarnings("rawtypes") - @Ignore - @Test - public void test() { - Config config = ConfigFactory.load("application-test.conf"); - AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt"); - publisher.init(config, new HashMap()); - PublishSpec spec = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class); - publisher.onPublishChange(spec.getPublishments(), null, null, null); - AlertStreamEvent event = create("testAlertStream"); - publisher.nextEvent(new PublishPartition(event.getStreamId(), event.getPolicyId(), - spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event); - AlertStreamEvent event1 = create("testAlertStream"); - publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(), - spec.getPublishments().get(0).getName(), spec.getPublishments().get(0).getPartitionColumns()), event1); - } - - private AlertStreamEvent create(String streamId) { - AlertStreamEvent alert = new AlertStreamEvent(); - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("policy1"); - alert.setPolicyId(policy.getName()); - alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[] {"field_1", 2, "field_3"}); - alert.setStreamId(streamId); - alert.setCreatedBy(this.toString()); - return alert; - } - - - @Test - public void testMapComparatorAdded() { - - PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd1.json"), PublishSpec.class); - PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class); - - Map<String, Publishment> map1 = new HashMap<>(); - Map<String, Publishment> map2 = new HashMap<>(); - spec1.getPublishments().forEach(p -> map1.put(p.getName(), p)); - spec2.getPublishments().forEach(p -> map2.put(p.getName(), p)); - - MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2); - comparator.compare(); - Assert.assertTrue(comparator.getAdded().size() == 1); - - } - - @Test - public void testMapComparatorRemoved() { - - PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class); - PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd1.json"), PublishSpec.class); - - Map<String, Publishment> map1 = new HashMap<>(); - Map<String, Publishment> map2 = new HashMap<>(); - spec1.getPublishments().forEach(p -> map1.put(p.getName(), p)); - spec2.getPublishments().forEach(p -> map2.put(p.getName(), p)); - - MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2); - comparator.compare(); - Assert.assertTrue(comparator.getRemoved().size() == 1); - - } - - @Test - public void testMapComparatorModified() { - - PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForAdd0.json"), PublishSpec.class); - PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishForMdyValue.json"), PublishSpec.class); - - Map<String, Publishment> map1 = new HashMap<>(); - Map<String, Publishment> map2 = new HashMap<>(); - spec1.getPublishments().forEach(p -> map1.put(p.getName(), p)); - spec2.getPublishments().forEach(p -> map2.put(p.getName(), p)); - - MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2); - comparator.compare(); - Assert.assertTrue(comparator.getModified().size() == 1); - - } - - - @Test - public void testMapComparator() { - PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class); - PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class); - PublishSpec spec3 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec3.json"), PublishSpec.class); - Map<String, Publishment> map1 = new HashMap<>(); - Map<String, Publishment> map2 = new HashMap<>(); - spec1.getPublishments().forEach(p -> map1.put(p.getName(), p)); - spec2.getPublishments().forEach(p -> map2.put(p.getName(), p)); - - MapComparator<String, Publishment> comparator = new MapComparator<>(map1, map2); - comparator.compare(); - Assert.assertTrue(comparator.getModified().size() == 1); - - AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null); - publisherBolt.onAlertPublishSpecChange(spec1, null); - publisherBolt.onAlertPublishSpecChange(spec2, null); - publisherBolt.onAlertPublishSpecChange(spec3, null); - } - - @SuppressWarnings("rawtypes") - @Test - public void testAlertPublisher() throws Exception { - AlertPublisher alertPublisher = new AlertPublisherImpl("alert-publisher-test"); - Config config = ConfigFactory.load("application-test.conf"); - alertPublisher.init(config, new HashMap()); - List<Publishment> oldPubs = loadEntities("/publishments1.json", Publishment.class); - List<Publishment> newPubs = loadEntities("/publishments2.json", Publishment.class); - alertPublisher.onPublishChange(oldPubs, null, null, null); - alertPublisher.onPublishChange(null, null, newPubs, oldPubs); - } - - private <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception { - ObjectMapper objectMapper = new ObjectMapper(); - JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz)); - List<T> l = objectMapper.readValue(TestAlertPublisherBolt.class.getResourceAsStream(path), type); - return l; - } - - private AlertStreamEvent createWithStreamDef(String hostname, String appName, String state) { - AlertStreamEvent alert = new AlertStreamEvent(); - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("perfmon_cpu_host_check"); - alert.setPolicyId(policy.getName()); - alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[] {appName, hostname, state}); - alert.setStreamId("testAlertStream"); - alert.setCreatedBy(this.toString()); - - // build stream definition - StreamDefinition sd = new StreamDefinition(); - StreamColumn appColumn = new StreamColumn(); - appColumn.setName("appname"); - appColumn.setType(StreamColumn.Type.STRING); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("hostname"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn stateColumn = new StreamColumn(); - stateColumn.setName("state"); - stateColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(appColumn, hostColumn, stateColumn)); - - alert.setSchema(sd); - return alert; - } - - @Test - public void testCustomFieldDedupEvent() throws Exception { - List<Publishment> pubs = loadEntities("/router/publishments.json", Publishment.class); - - AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null); - AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN"); - AlertStreamEvent event2 = createWithStreamDef("host2", "testapp1", "OPEN"); - AlertStreamEvent event3 = createWithStreamDef("host2", "testapp2", "CLOSE"); - - Assert.assertNotNull(plugin.dedup(event1)); - Assert.assertNull(plugin.dedup(event2)); - Assert.assertNotNull(plugin.dedup(event3)); - } - - @Test - public void testEmptyCustomFieldDedupEvent() throws Exception { - List<Publishment> pubs = loadEntities("/router/publishments-empty-dedup-field.json", Publishment.class); - - AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(pubs.get(0), null, null); - AlertStreamEvent event1 = createWithStreamDef("host1", "testapp1", "OPEN"); - AlertStreamEvent event2 = createWithStreamDef("host1", "testapp1", "OPEN"); - - Assert.assertNotNull(plugin.dedup(event1)); - Assert.assertNull(plugin.dedup(event2)); - } - - private AlertStreamEvent createSeverityWithStreamDef(String hostname, String appName, String message, String severity, String docId, String df_device, String df_type, String colo) { - AlertStreamEvent alert = new AlertStreamEvent(); - PolicyDefinition policy = new PolicyDefinition(); - policy.setName("switch_check"); - alert.setPolicyId(policy.getName()); - alert.setCreatedTime(System.currentTimeMillis()); - alert.setData(new Object[] {appName, hostname, message, severity, docId, df_device, df_type, colo}); - alert.setStreamId("testAlertStream"); - alert.setCreatedBy(this.toString()); - - // build stream definition - StreamDefinition sd = new StreamDefinition(); - StreamColumn appColumn = new StreamColumn(); - appColumn.setName("appname"); - appColumn.setType(StreamColumn.Type.STRING); - - StreamColumn hostColumn = new StreamColumn(); - hostColumn.setName("hostname"); - hostColumn.setType(StreamColumn.Type.STRING); - - StreamColumn msgColumn = new StreamColumn(); - msgColumn.setName("message"); - msgColumn.setType(StreamColumn.Type.STRING); - - StreamColumn severityColumn = new StreamColumn(); - severityColumn.setName("severity"); - severityColumn.setType(StreamColumn.Type.STRING); - - StreamColumn docIdColumn = new StreamColumn(); - docIdColumn.setName("docId"); - docIdColumn.setType(StreamColumn.Type.STRING); - - StreamColumn deviceColumn = new StreamColumn(); - deviceColumn.setName("df_device"); - deviceColumn.setType(StreamColumn.Type.STRING); - - StreamColumn deviceTypeColumn = new StreamColumn(); - deviceTypeColumn.setName("df_type"); - deviceTypeColumn.setType(StreamColumn.Type.STRING); - - StreamColumn coloColumn = new StreamColumn(); - coloColumn.setName("dc"); - coloColumn.setType(StreamColumn.Type.STRING); - - sd.setColumns(Arrays.asList(appColumn, hostColumn, msgColumn, severityColumn, docIdColumn, deviceColumn, deviceTypeColumn, coloColumn)); - - alert.setSchema(sd); - return alert; - } - - @Test - public void testSlackPublishment() throws Exception { - Config config = ConfigFactory.load("application-test.conf"); - AlertPublisher publisher = new AlertPublisherImpl("alertPublishBolt"); - publisher.init(config, new HashMap()); - List<Publishment> pubs = loadEntities("/router/publishments-slack.json", Publishment.class); - publisher.onPublishChange(pubs, null, null, null); - - AlertStreamEvent event1 = createSeverityWithStreamDef("switch1", "testapp1", "Memory 1 inconsistency detected", "WARNING", "docId1", "ed01", "distribution switch", "us"); - AlertStreamEvent event2 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 2 inconsistency detected", "CRITICAL", "docId2", "ed02", "distribution switch", "us"); - AlertStreamEvent event3 = createSeverityWithStreamDef("switch2", "testapp2", "Memory 3 inconsistency detected", "WARNING", "docId3", "ed02", "distribution switch", "us"); - - publisher.nextEvent(new PublishPartition(event1.getStreamId(), event1.getPolicyId(), - pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event1); - publisher.nextEvent(new PublishPartition(event2.getStreamId(), event2.getPolicyId(), - pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event2); - publisher.nextEvent(new PublishPartition(event3.getStreamId(), event3.getPolicyId(), - pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3); - - } -}
http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java deleted file mode 100644 index 704857d..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestStreamRouterBoltOutputCollector.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.eagle.alert.engine.router; - -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Tuple; -import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.engine.StreamContext; -import org.apache.eagle.alert.engine.StreamContextImpl; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; - -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.engine.router.impl.StormOutputCollector; -import org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import java.text.ParseException; -import java.util.*; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -public class TestStreamRouterBoltOutputCollector { - - @Test - public void testStreamRouterCollector() throws ParseException { - String streamId = "HDFS_AUDIT_LOG_ENRICHED_STREAM_SANDBOX"; - StreamPartition partition = new StreamPartition(); - partition.setStreamId(streamId); - partition.setType(StreamPartition.Type.GROUPBY); - partition.setColumns(new ArrayList<String>() {{ - add("col1"); - }}); - - // begin to create two router specs - WorkSlot worker1 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt1"); - WorkSlot worker2 = new WorkSlot("ALERT_UNIT_TOPOLOGY_APP_SANDBOX", "alertBolt2"); - - PolicyWorkerQueue queue1 = new PolicyWorkerQueue(); - queue1.setPartition(partition); - queue1.setWorkers(new ArrayList<WorkSlot>() { - { - add(worker1); - } - }); - - PolicyWorkerQueue queue2 = new PolicyWorkerQueue(); - queue2.setPartition(partition); - queue2.setWorkers(new ArrayList<WorkSlot>() { - { - add(worker1); - add(worker2); - } - }); - - StreamRouterSpec spec1 = new StreamRouterSpec(); - spec1.setStreamId(streamId); - spec1.setPartition(partition); - - spec1.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{ - add(queue1); - }}); - - StreamRouterSpec spec2 = new StreamRouterSpec(); - spec2.setStreamId(streamId); - spec2.setPartition(partition); - - spec2.setTargetQueue(new ArrayList<PolicyWorkerQueue>() {{ - add(queue2); - }}); - - // the end of creating - - List<String> targetStreamIds = new ArrayList<>(); - IOutputCollector delegate = new IOutputCollector() { - - @Override - public void reportError(Throwable error) { - - } - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - targetStreamIds.add(streamId); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - }; - - List<StreamRouterSpec> list1 = new ArrayList<>(); - list1.add(spec1); - - List<StreamRouterSpec> list2 = new ArrayList<>(); - list2.add(spec2); - - // construct StreamDefinition - StreamDefinition schema = new StreamDefinition(); - schema.setStreamId(streamId); - StreamColumn column = new StreamColumn(); - column.setName("col1"); - column.setType(StreamColumn.Type.STRING); - schema.setColumns(Collections.singletonList(column)); - Map<String, StreamDefinition> sds = new HashMap<>(); - sds.put(schema.getStreamId(), schema); - - // create two events - StreamEvent event1 = new StreamEvent(); - Object[] data = new Object[]{"value1"}; - event1.setData(data); - event1.setStreamId(streamId); - PartitionedEvent pEvent1 = new PartitionedEvent(); - pEvent1.setEvent(event1); - pEvent1.setPartition(partition); - - StreamEvent event2 = new StreamEvent(); - Object[] data2 = new Object[]{"value3"}; - event2.setData(data2); - event2.setStreamId(streamId); - PartitionedEvent pEvent2 = new PartitionedEvent(); - pEvent2.setEvent(event2); - pEvent2.setPartition(partition); - - TopologyContext context = Mockito.mock(TopologyContext.class); - when(context.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric()); - StreamContext streamContext = new StreamContextImpl(null, context.registerMetric("eagle.router", new MultiCountMetric(), 60), context); - StreamRouterBoltOutputCollector collector = new StreamRouterBoltOutputCollector("test", new StormOutputCollector(new OutputCollector(delegate), null), null, streamContext); - - // add a StreamRouterSpec which has one worker - collector.onStreamRouterSpecChange(list1, new ArrayList<>(), new ArrayList<>(), sds); - collector.emit(pEvent1); - Assert.assertTrue(targetStreamIds.size() == 1); - - // modify the StreamRouterSpec to contain two workers - collector.onStreamRouterSpecChange(new ArrayList<>(), new ArrayList<>(), list2, sds); - collector.emit(pEvent2); - Assert.assertTrue(targetStreamIds.size() == 2); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java deleted file mode 100644 index a480fcf..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/runner/TestStreamRouterBolt.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.runner; - -import backtype.storm.metric.api.MultiCountMetric; -import backtype.storm.task.GeneralTopologyContext; -import backtype.storm.task.IOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue; -import org.apache.eagle.alert.coordination.model.RouterSpec; -import org.apache.eagle.alert.coordination.model.StreamRouterSpec; -import org.apache.eagle.alert.coordination.model.WorkSlot; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.coordinator.StreamSortSpec; -import org.apache.eagle.alert.engine.coordinator.impl.AbstractMetadataChangeNotifyService; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.utils.StreamIdConversion; -import org.apache.eagle.common.DateTimeUtil; - -import org.joda.time.Period; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestStreamRouterBolt { - private final static Logger LOG = LoggerFactory.getLogger(TestStreamRouterBolt.class); - - /** - * Mocked 5 Events - * <p> - * 1. Sent in random order: - * "value1","value2","value3","value4","value5" - * <p> - * 2. Received correct time order and value5 is thrown because too late: "value2","value1","value3","value4" - * - * @throws Exception - */ - @SuppressWarnings("rawtypes") - @Test - public void testRouterWithSortAndRouteSpec() throws Exception { - Config config = ConfigFactory.load(); - MockChangeService mockChangeService = new MockChangeService(); - StreamRouterBolt routerBolt = new StreamRouterBolt("routerBolt1", config, mockChangeService); - - final Map<String, List<PartitionedEvent>> streamCollected = new HashMap<>(); - final List<PartitionedEvent> orderCollected = new ArrayList<>(); - - OutputCollector collector = new OutputCollector(new IOutputCollector() { - int count = 0; - - @Override - public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { - PartitionedEvent event; - try { - event = routerBolt.deserialize(tuple.get(0)); - } catch (IOException e) { - throw new RuntimeException(e); - } - if (count == 0) { - count++; - } - LOG.info(String.format("Collector received: [streamId=[%s], tuple=[%s] ", streamId, tuple)); - if (!streamCollected.containsKey(streamId)) { - streamCollected.put(streamId, new ArrayList<>()); - } - streamCollected.get(streamId).add(event); - orderCollected.add(event); - return null; - } - - @Override - public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { - } - - @Override - public void ack(Tuple input) { - } - - @Override - public void fail(Tuple input) { - } - - @SuppressWarnings("unused") - public void resetTimeout(Tuple input) { - } - - @Override - public void reportError(Throwable error) { - } - }); - - Map stormConf = new HashMap<>(); - TopologyContext topologyContext = mock(TopologyContext.class); - when(topologyContext.registerMetric(any(String.class), any(MultiCountMetric.class), any(int.class))).thenReturn(new MultiCountMetric()); - routerBolt.prepare(stormConf, topologyContext, collector); - - String streamId = "cpuUsageStream"; - // StreamPartition, groupby col1 for stream cpuUsageStream - StreamPartition sp = new StreamPartition(); - sp.setStreamId(streamId); - sp.setColumns(Collections.singletonList("col1")); - sp.setType(StreamPartition.Type.GROUPBY); - - StreamSortSpec sortSpec = new StreamSortSpec(); -// sortSpec.setColumn("timestamp"); -// sortSpec.setOrder("asc"); - sortSpec.setWindowPeriod2(Period.minutes(1)); - sortSpec.setWindowMargin(1000); - sp.setSortSpec(sortSpec); - - RouterSpec boltSpec = new RouterSpec(); - - // set StreamRouterSpec to have 2 WorkSlot - StreamRouterSpec routerSpec = new StreamRouterSpec(); - routerSpec.setPartition(sp); - routerSpec.setStreamId(streamId); - PolicyWorkerQueue queue = new PolicyWorkerQueue(); - queue.setPartition(sp); - queue.setWorkers(Arrays.asList(new WorkSlot("testTopology", "alertBolt1"), new WorkSlot("testTopology", "alertBolt2"))); - routerSpec.setTargetQueue(Collections.singletonList(queue)); - boltSpec.addRouterSpec(routerSpec); - boltSpec.setVersion("version1"); - - // construct StreamDefinition - StreamDefinition schema = new StreamDefinition(); - schema.setStreamId(streamId); - StreamColumn column = new StreamColumn(); - column.setName("col1"); - column.setType(StreamColumn.Type.STRING); - schema.setColumns(Collections.singletonList(column)); - Map<String, StreamDefinition> sds = new HashMap<>(); - sds.put(schema.getStreamId(), schema); - - routerBolt.declareOutputStreams(Arrays.asList("alertBolt1", "alertBolt2")); - routerBolt.onStreamRouteBoltSpecChange(boltSpec, sds); - GeneralTopologyContext context = mock(GeneralTopologyContext.class); - int taskId = 1; - when(context.getComponentId(taskId)).thenReturn("comp1"); - when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0")); - - // ======================================= - // Mock 5 Events - // - // 1. Sent in random order: - // "value1","value2","value3","value4","value5" - // - // 2. Received correct time order and value5 is thrown because too: - // "value2","value1","value3","value4" - // ======================================= - - // construct event with "value1" - StreamEvent event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:30") * 1000); - Object[] data = new Object[] {"value1"}; - event.setData(data); - event.setStreamId(streamId); - PartitionedEvent pEvent = new PartitionedEvent(); - pEvent.setEvent(event); - pEvent.setPartition(sp); - Tuple input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default"); - routerBolt.execute(input); - - // construct another event with "value2" - event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:10") * 1000); - data = new Object[] {"value2"}; - event.setData(data); - event.setStreamId(streamId); - pEvent = new PartitionedEvent(); - pEvent.setPartition(sp); - pEvent.setEvent(event); - input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default"); - routerBolt.execute(input); - - // construct another event with "value3" - event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:01:40") * 1000); - data = new Object[] {"value3"}; - event.setData(data); - event.setStreamId(streamId); - pEvent = new PartitionedEvent(); - pEvent.setPartition(sp); - pEvent.setEvent(event); - input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default"); - routerBolt.execute(input); - - // construct another event with "value4" - event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:02:10") * 1000); - data = new Object[] {"value4"}; - event.setData(data); - event.setStreamId(streamId); - pEvent = new PartitionedEvent(); - pEvent.setPartition(sp); - pEvent.setEvent(event); - input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default"); - routerBolt.execute(input); - - // construct another event with "value5", which will be thrown because two late - event = new StreamEvent(); - event.setTimestamp(DateTimeUtil.humanDateToSeconds("2016-01-01 00:00:10") * 1000); - data = new Object[] {"value5"}; - event.setData(data); - event.setStreamId(streamId); - pEvent = new PartitionedEvent(); - pEvent.setPartition(sp); - pEvent.setEvent(event); - input = new TupleImpl(context, Collections.singletonList(pEvent), taskId, "default"); - routerBolt.execute(input); - - Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size()); - Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt1", streamCollected.keySet().contains( - String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt1")))); - Assert.assertTrue("Should collect stream stream_routerBolt_to_alertBolt2", streamCollected.keySet().contains( - String.format(StreamIdConversion.generateStreamIdBetween(routerBolt.getBoltId(), "alertBolt2")))); - - Assert.assertEquals("Should finally collect 3 events", 3, orderCollected.size()); - Assert.assertArrayEquals("Should sort 3 events in ASC order", new String[] {"value2", "value1", "value3"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray()); - - // The first 3 events are ticked automatically by window - - routerBolt.cleanup(); - - // Close will flush all events in memory, so will receive the last event which is still in memory as window is not expired according to clock - // The 5th event will be thrown because too late and out of margin - - Assert.assertEquals("Should finally collect two streams", 2, streamCollected.size()); - Assert.assertEquals("Should finally collect 3 events", 4, orderCollected.size()); - Assert.assertArrayEquals("Should sort 4 events in ASC-ordered timestamp", new String[] {"value2", "value1", "value3", "value4"}, orderCollected.stream().map((d) -> d.getData()[0]).toArray()); - - } - - @SuppressWarnings("serial") - public static class MockChangeService extends AbstractMetadataChangeNotifyService { - private final static Logger LOG = LoggerFactory.getLogger(MockChangeService.class); - - @Override - public void close() throws IOException { - LOG.info("Closing"); - } - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java deleted file mode 100644 index a3939cc..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/JavaSerializationTest.java +++ /dev/null @@ -1,112 +0,0 @@ -package org.apache.eagle.alert.engine.serialization; - -import org.apache.commons.lang3.SerializationUtils; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.coordinator.StreamPartition; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.model.StreamEvent; -import org.apache.eagle.alert.utils.ByteUtils; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public class JavaSerializationTest { - private final static Logger LOG = LoggerFactory.getLogger(JavaSerializationTest.class); - - @Test - public void testJavaSerialization() { - PartitionedEvent partitionedEvent = new PartitionedEvent(); - partitionedEvent.setPartitionKey(partitionedEvent.hashCode()); - partitionedEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream", Arrays.asList("name", "host"))); - StreamEvent event = new StreamEvent(); - event.setStreamId("sampleStream"); - event.setTimestamp(System.currentTimeMillis()); - event.setData(new Object[] {"CPU", "LOCALHOST", true, Long.MAX_VALUE, 60.0}); - partitionedEvent.setEvent(event); - - int javaSerializationLength = SerializationUtils.serialize(partitionedEvent).length; - LOG.info("Java serialization length: {}, event: {}", javaSerializationLength, partitionedEvent); - - int compactLength = 0; - compactLength += "sampleStream".getBytes().length; - compactLength += ByteUtils.intToBytes(partitionedEvent.getPartition().hashCode()).length; - compactLength += ByteUtils.longToBytes(partitionedEvent.getTimestamp()).length; - compactLength += "CPU".getBytes().length; - compactLength += "LOCALHOST".getBytes().length; - compactLength += 1; - compactLength += ByteUtils.longToBytes(Long.MAX_VALUE).length; - compactLength += ByteUtils.doubleToBytes(60.0).length; - - LOG.info("Compact serialization length: {}, event: {}", compactLength, partitionedEvent); - Assert.assertTrue(compactLength * 20 < javaSerializationLength); - } - - - public static StreamDefinition createSampleStreamDefinition(String streamId) { - StreamDefinition sampleStreamDefinition = new StreamDefinition(); - sampleStreamDefinition.setStreamId(streamId); - sampleStreamDefinition.setTimeseries(true); - sampleStreamDefinition.setValidate(true); - sampleStreamDefinition.setDescription("Schema for " + streamId); - List<StreamColumn> streamColumns = new ArrayList<>(); - - streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build()); - streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build()); - streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build()); - streamColumns.add(new StreamColumn.Builder().name("data").type(StreamColumn.Type.LONG).build()); - streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build()); - sampleStreamDefinition.setColumns(streamColumns); - return sampleStreamDefinition; - } - - public static StreamPartition createSampleStreamGroupbyPartition(String streamId, List<String> groupByField) { - StreamPartition streamPartition = new StreamPartition(); - streamPartition.setStreamId(streamId); - streamPartition.setColumns(groupByField); - streamPartition.setType(StreamPartition.Type.GROUPBY); - return streamPartition; - } - - @SuppressWarnings("serial") - public static PartitionedEvent createSimpleStreamEvent() { - StreamEvent event = StreamEvent.builder() - .schema(createSampleStreamDefinition("sampleStream_1")) - .streamId("sampleStream_1") - .timestamep(System.currentTimeMillis()) - .attributes(new HashMap<String, Object>() {{ - put("name", "cpu"); - put("host", "localhost"); - put("flag", true); - put("value", 60.0); - put("data", Long.MAX_VALUE); - put("unknown", "unknown column value"); - }}).build(); - PartitionedEvent pEvent = new PartitionedEvent(); - pEvent.setEvent(event); - pEvent.setPartition(createSampleStreamGroupbyPartition("sampleStream_1", Arrays.asList("name", "host"))); - return pEvent; - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java deleted file mode 100644 index 5a81e26..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/serialization/PartitionedEventSerializerTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.serialization; - -import backtype.storm.serialization.DefaultKryoFactory; -import backtype.storm.serialization.DefaultSerializationDelegate; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; -import org.apache.commons.lang.time.StopWatch; -import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.serialization.impl.PartitionedEventSerializerImpl; -import org.apache.eagle.alert.utils.TimePeriodUtils; -import org.joda.time.Period; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.BitSet; - - -public class PartitionedEventSerializerTest { - private final static Logger LOG = LoggerFactory.getLogger(PartitionedEventSerializerTest.class); - - @SuppressWarnings("deprecation") - @Test - public void testPartitionEventSerialization() throws IOException { - PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis()); - ; - PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition); - - ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput(); - serializer.serialize(partitionedEvent, dataOutput1); - byte[] serializedBytes = dataOutput1.toByteArray(); - PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes)); - Assert.assertEquals(partitionedEvent, deserializedEvent); - - PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true); - - byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent); - PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed); - Assert.assertEquals(partitionedEvent, deserializedEventCompressed); - - PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition); - ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput(); - serializer2.serialize(partitionedEvent, dataOutput2); - byte[] serializedBytes2 = dataOutput2.toByteArray(); - ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2); - PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2); - Assert.assertEquals(partitionedEvent, deserializedEvent2); - - byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent); - Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); - Output output = new Output(10000); - kryo.writeClassAndObject(output, partitionedEvent); - byte[] kryoBytes = output.toBytes(); - Input input = new Input(kryoBytes); - PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input); - Assert.assertEquals(partitionedEvent, kryoDeserializedEvent); - LOG.info("\nCached Stream:{}\nCompressed Cached Stream :{}\nCached Stream + Cached Partition: {}\nJava Native: {}\nKryo: {}\nKryo + Cached Stream: {}\nKryo + Cached Stream + Cached Partition: {}", serializedBytes.length, serializedBytesCompressed.length, serializedBytes2.length, javaSerialization.length, kryoBytes.length, kryoSerialize(serializedBytes).length, kryoSerialize(serializedBytes2).length); - } - - @SuppressWarnings("deprecation") - @Test - public void testPartitionEventSerializationEfficiency() throws IOException { - PartitionedEvent partitionedEvent = MockSampleMetadataFactory.createPartitionedEventGroupedByName("sampleStream", System.currentTimeMillis()); - ; - PartitionedEventSerializerImpl serializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition); - - int count = 100000; - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - int i = 0; - while (i < count) { - ByteArrayDataOutput dataOutput1 = ByteStreams.newDataOutput(); - serializer.serialize(partitionedEvent, dataOutput1); - byte[] serializedBytes = dataOutput1.toByteArray(); - PartitionedEvent deserializedEvent = serializer.deserialize(ByteStreams.newDataInput(serializedBytes)); - Assert.assertEquals(partitionedEvent, deserializedEvent); - i++; - } - stopWatch.stop(); - LOG.info("Cached Stream: {} ms", stopWatch.getTime()); - stopWatch.reset(); - PartitionedEventSerializerImpl compressSerializer = new PartitionedEventSerializerImpl(MockSampleMetadataFactory::createSampleStreamDefinition, true); - i = 0; - stopWatch.start(); - while (i < count) { - byte[] serializedBytesCompressed = compressSerializer.serialize(partitionedEvent); - PartitionedEvent deserializedEventCompressed = compressSerializer.deserialize(serializedBytesCompressed); - Assert.assertEquals(partitionedEvent, deserializedEventCompressed); - i++; - } - stopWatch.stop(); - LOG.info("Compressed Cached Stream: {} ms", stopWatch.getTime()); - stopWatch.reset(); - - i = 0; - stopWatch.start(); - while (i < count) { - PartitionedEventDigestSerializer serializer2 = new PartitionedEventDigestSerializer(MockSampleMetadataFactory::createSampleStreamDefinition); - ByteArrayDataOutput dataOutput2 = ByteStreams.newDataOutput(); - serializer2.serialize(partitionedEvent, dataOutput2); - byte[] serializedBytes2 = dataOutput2.toByteArray(); - ByteArrayDataInput dataInput2 = ByteStreams.newDataInput(serializedBytes2); - PartitionedEvent deserializedEvent2 = serializer2.deserialize(dataInput2); - Assert.assertEquals(partitionedEvent, deserializedEvent2); - i++; - } - stopWatch.stop(); - LOG.info("Cached Stream&Partition: {} ms", stopWatch.getTime()); - stopWatch.reset(); - i = 0; - stopWatch.start(); - while (i < count) { - byte[] javaSerialization = new DefaultSerializationDelegate().serialize(partitionedEvent); - PartitionedEvent javaSerializedEvent = (PartitionedEvent) new DefaultSerializationDelegate().deserialize(javaSerialization); - Assert.assertEquals(partitionedEvent, javaSerializedEvent); - i++; - } - stopWatch.stop(); - LOG.info("Java Native: {} ms", stopWatch.getTime()); - stopWatch.reset(); - i = 0; - stopWatch.start(); - Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); - while (i < count) { - Output output = new Output(10000); - kryo.writeClassAndObject(output, partitionedEvent); - byte[] kryoBytes = output.toBytes(); - Input input = new Input(kryoBytes); - PartitionedEvent kryoDeserializedEvent = (PartitionedEvent) kryo.readClassAndObject(input); - Assert.assertEquals(partitionedEvent, kryoDeserializedEvent); - i++; - } - stopWatch.stop(); - LOG.info("Kryo: {} ms", stopWatch.getTime()); - } - - /** - * Kryo Serialization Length = Length of byte[] + 2 - */ - @Test - public void testKryoByteArraySerialization() { - Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); - byte[] bytes = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - Output output = new Output(1000); - kryo.writeObject(output, bytes); - Assert.assertEquals(bytes.length + 2, output.toBytes().length); - } - - private byte[] kryoSerialize(Object object) { - Kryo kryo = new DefaultKryoFactory.KryoSerializableDefault(); - Output output = new Output(100000); - kryo.writeClassAndObject(output, object); - return output.toBytes(); - } - - @Test - public void testBitSet() { - BitSet bitSet = new BitSet(); - bitSet.set(0, true); // 1 - bitSet.set(1, false); // 0 - bitSet.set(2, true); // 1 - LOG.info("Bit Set Size: {}", bitSet.size()); - LOG.info("Bit Set Byte[]: {}", bitSet.toByteArray()); - LOG.info("Bit Set Byte[]: {}", bitSet.toLongArray()); - LOG.info("BitSet[0]: {}", bitSet.get(0)); - LOG.info("BitSet[1]: {}", bitSet.get(1)); - LOG.info("BitSet[1]: {}", bitSet.get(2)); - - byte[] bytes = bitSet.toByteArray(); - - BitSet bitSet2 = BitSet.valueOf(bytes); - - LOG.info("Bit Set Size: {}", bitSet2.size()); - LOG.info("Bit Set Byte[]: {}", bitSet2.toByteArray()); - LOG.info("Bit Set Byte[]: {}", bitSet2.toLongArray()); - LOG.info("BitSet[0]: {}", bitSet2.get(0)); - LOG.info("BitSet[1]: {}", bitSet2.get(1)); - LOG.info("BitSet[1]: {}", bitSet2.get(2)); - - - BitSet bitSet3 = new BitSet(); - bitSet3.set(0, true); - Assert.assertEquals(1, bitSet3.length()); - - BitSet bitSet4 = new BitSet(); - bitSet4.set(0, false); - Assert.assertEquals(0, bitSet4.length()); - Assert.assertFalse(bitSet4.get(1)); - Assert.assertFalse(bitSet4.get(2)); - } - - @Test - public void testPartitionType() { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java deleted file mode 100644 index 9520b62..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.siddhi; - -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.core.stream.output.StreamCallback; -import org.wso2.siddhi.core.util.EventPrinter; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * @since Jun 21, 2016 - */ -public class SiddhiPolicyTest { - - private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class); - - private String streams = " define stream syslog_stream(" - + "dims_facility string, " - + "dims_severity string, " - + "dims_hostname string, " - + "dims_msgid string, " - + "timestamp string, " - + "conn string, " - + "op string, " - + "msgId string, " - + "command string, " - + "name string, " - + "namespace string, " - + "epochMillis long); "; - private SiddhiManager sm; - - @Before - public void setup() { - sm = new SiddhiManager(); - } - - @After - public void shutdown() { - sm.shutdown(); - } - - @Test - public void testPolicy_grpby() { - String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; "; - StreamCallback sc = new StreamCallback() { - @Override - public void receive(Event[] arg0) { - - } - - ; - }; - - String executionPlan = streams + ql; - ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan); - runtime.addCallback("syslog_severity_check_output", sc); - runtime.start(); - } - - @Ignore - @Test - public void testPolicy_agg() throws Exception { - String sql = " from syslog_stream#window.time(1min) select " - + "name, " - + "namespace, " - + "timestamp, " - + "dims_hostname, " - + "count(*) as abortCount " - + "group by dims_hostname " - + "having abortCount > 3 insert into syslog_severity_check_output; "; - - final AtomicBoolean checked = new AtomicBoolean(false); - StreamCallback sc = new StreamCallback() { - @Override - public void receive(Event[] arg0) { - checked.set(true); - LOG.info("event array size: " + arg0.length); - Set<String> hosts = new HashSet<String>(); - for (Event e : arg0) { - hosts.add((String) e.getData()[3]); - } - - LOG.info(" grouped hosts : " + hosts); - Assert.assertTrue(hosts.contains("HOSTNAME-" + 0)); - Assert.assertTrue(hosts.contains("HOSTNAME-" + 1)); - Assert.assertTrue(hosts.contains("HOSTNAME-" + 2)); - Assert.assertFalse(hosts.contains("HOSTNAME-" + 3)); - } - - ; - }; - - String executionPlan = streams + sql; - ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan); - runtime.addCallback("syslog_severity_check_output", sc); - runtime.start(); - InputHandler handler = runtime.getInputHandler("syslog_stream"); - - sendInput(handler); - - Thread.sleep(1000); - - Assert.assertTrue(checked.get()); - - runtime.shutdown(); - } - - /* - + "dims_facility string, " - + "dims_severity string, " - + "dims_hostname string, " - + "dims_msgid string, " - + "timestamp string, " - + "conn string, " - + "op string, " - + "msgId string, " - + "command string, " - + "name string, " - + "namespace string, " - + "epochMillis long) - */ - private void sendInput(InputHandler handler) throws Exception { - int length = 15; - Event[] events = new Event[length]; - for (int i = 0; i < length; i++) { - Event e = new Event(12); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - - events[i] = e; - } - - handler.send(events); - - Thread.sleep(61 * 1000); - - Event e = new Event(12); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - handler.send(e); - } - - @Ignore - @Test - public void testPolicy_regex() throws Exception { - String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; "; - - AtomicBoolean checked = new AtomicBoolean(); - StreamCallback sc = new StreamCallback() { - @Override - public void receive(Event[] arg0) { - checked.set(true); - } - - ; - }; - - String executionPlan = streams + sql; - ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan); - runtime.addCallback("syslog_severity_check_output", sc); - runtime.start(); - - InputHandler handler = runtime.getInputHandler("syslog_stream"); - - sendInput(handler); - - Thread.sleep(1000); - - Assert.assertTrue(checked.get()); - - runtime.shutdown(); - } - - @Ignore - @Test - public void testPolicy_seq() throws Exception { - String sql = "" - + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> " - + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min " - + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op " - + " insert into syslog_severity_check_output; "; - - AtomicBoolean checked = new AtomicBoolean(); - StreamCallback sc = new StreamCallback() { - @Override - public void receive(Event[] arg0) { - checked.set(true); - } - - ; - }; - - String executionPlan = streams + sql; - ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan); - runtime.addCallback("syslog_severity_check_output", sc); - runtime.start(); - InputHandler handler = runtime.getInputHandler("syslog_stream"); - - sendPatternInput(handler); - - Thread.sleep(1000); - Assert.assertTrue(checked.get()); - - runtime.shutdown(); - } - - private void sendPatternInput(InputHandler handler) throws Exception { - // validate one - Event e = new Event(12); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - - e = new Event(12); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - - e = new Event(12); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - - Thread.sleep(61 * 1000); - - e = new Event(12); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); - handler.send(e); - } - - - @Test - public void testStrConcat() throws Exception { - String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " + - " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; "; - SiddhiManager manager = new SiddhiManager(); - ExecutionPlanRuntime runtime = manager.createExecutionPlanRuntime(ql); - runtime.addCallback("output", new StreamCallback() { - @Override - public void receive(Event[] events) { - EventPrinter.print(events); - } - }); - - runtime.start(); - - InputHandler logInput = runtime.getInputHandler("log"); - - Event e = new Event(); - e.setTimestamp(System.currentTimeMillis()); - e.setData(new Object[] {System.currentTimeMillis(), "switch-ra-slc-01", "port01", "log-message...."}); - logInput.send(e); - - Thread.sleep(1000); - runtime.shutdown(); - - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java deleted file mode 100644 index 7694623..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/extension/AttributeCollectAggregatorTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.siddhi.extension; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.core.stream.output.StreamCallback; - -import java.util.LinkedList; -import java.util.List; -import java.util.Random; - -/** - * @since Apr 1, 2016 - */ -public class AttributeCollectAggregatorTest { - - private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class); - - @Test - public void test() throws Exception { - String ql = "define stream s1(timestamp long, host string, type string);"; - ql += " from s1#window.externalTime(timestamp, 1 sec)"; - ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;"; - - SiddhiManager sm = new SiddhiManager(); - ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql); - - InputHandler input = runtime.getInputHandler("s1"); - runtime.addCallback("output", new StreamCallback() { - - @Override - public void receive(Event[] arg0) { - logger.info("output event length:" + arg0.length); - - for (Event e : arg0) { - StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]"); - for (Object o : e.getData()) { - sb.append("," + o); - } - logger.info(sb.toString()); - } - logger.info("===end==="); - } - }); -// StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output"); - - runtime.start(); - - Event[] events = generateEvents(); - for (Event e : events) { - input.send(e); - } - - Thread.sleep(1000); - - } - - private Event[] generateEvents() { - List<Event> events = new LinkedList<Event>(); - - Random r = new Random(); - Event e = null; - long base = System.currentTimeMillis(); - { - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"}); - base += 100; - events.add(e); - } - - { - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"}); - base += 100; - events.add(e); - } - - base += 10000; - { - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"}); - base += 100; - events.add(e); - } - - { - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"}); - base += 100; - events.add(e); - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"}); - base += 100; - events.add(e); - } - base += 10000; - e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"}); - - return events.toArray(new Event[0]); - } - - @Test - public void testQuery() { - String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );"; - ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;"; - - SiddhiManager sm = new SiddhiManager(); - sm.createExecutionPlanRuntime(ql); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java deleted file mode 100644 index ebec509..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/MapDBTestSuite.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import org.junit.Assert; -import org.junit.Test; -import org.mapdb.BTreeMap; -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.mapdb.Serializer; - -public class MapDBTestSuite { - @Test - public void testOnHeapDB() { - DB db = DBMaker.heapDB().make(); - BTreeMap<Long, String> map = db.treeMap("btree").keySerializer(Serializer.LONG).valueSerializer(Serializer.STRING).create(); - Assert.assertFalse(map.putIfAbsentBoolean(1L, "val_1")); - Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_2")); - Assert.assertTrue(map.putIfAbsentBoolean(1L, "val_3")); - Assert.assertFalse(map.putIfAbsentBoolean(2L, "val_4")); - - Assert.assertEquals("val_1", map.get(1L)); - Assert.assertEquals("val_4", map.get(2L)); - - Assert.assertTrue(map.replace(2L, "val_4", "val_5")); - Assert.assertEquals("val_5", map.get(2L)); - - map.close(); - db.close(); - } -} http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java deleted file mode 100644 index ff7b8ee..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/sorter/StreamSortHandlerTest.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.alert.engine.sorter; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.ScheduledReporter; -import com.codahale.metrics.Slf4jReporter; -import com.codahale.metrics.jvm.GarbageCollectorMetricSet; -import com.codahale.metrics.jvm.MemoryUsageGaugeSet; -import com.google.common.collect.Ordering; -import org.apache.commons.lang.time.StopWatch; -import org.apache.eagle.alert.engine.mock.MockPartitionedCollector; -import org.apache.eagle.alert.engine.mock.MockSampleMetadataFactory; -import org.apache.eagle.alert.engine.model.PartitionedEvent; -import org.apache.eagle.alert.engine.sorter.impl.PartitionedEventTimeOrderingComparator; -import org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl; -import org.apache.eagle.alert.engine.sorter.impl.StreamTimeClockInLocalMemory; -import org.apache.eagle.common.DateTimeUtil; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.management.ManagementFactory; -import java.util.*; -import java.util.concurrent.TimeUnit; - -/** - * -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+PrintGCTaskTimeStamps -XX:+PrintGCDetails -verbose:gc - */ -public class StreamSortHandlerTest { - private final static Logger LOG = LoggerFactory.getLogger(StreamSortHandlerTest.class); - - static { - LOG.info(ManagementFactory.getRuntimeMXBean().getName()); - } - - private ScheduledReporter metricReporter; - - @Before - public void setUp() { - final MetricRegistry metrics = new MetricRegistry(); - metrics.registerAll(new MemoryUsageGaugeSet()); - metrics.registerAll(new GarbageCollectorMetricSet()); - metricReporter = Slf4jReporter.forRegistry(metrics) - .filter((name, metric) -> name.matches("(.*heap|pools.PS.*).usage")) - .withLoggingLevel(Slf4jReporter.LoggingLevel.DEBUG) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .build(); - metricReporter.start(60, TimeUnit.SECONDS); - } - - /** - * Used to debug window bucket lifecycle - * <p> - * Window period: PT1s, margin: 5s - * - * @throws InterruptedException - */ - @Test - public void testWithUnsortedEventsIn1MinuteWindow() throws InterruptedException { - MockPartitionedCollector mockCollector = new MockPartitionedCollector(); - StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1"); - Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE); - StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl(); - sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1m", 5000), mockCollector); - List<PartitionedEvent> unsortedList = new LinkedList<>(); - - int i = 0; - while (i < 1000) { - PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1"); - sortHandler.nextEvent(event); - unsortedList.add(event); - if (event.getTimestamp() > timeClock.getTime()) { - timeClock.moveForward(event.getTimestamp()); - } - sortHandler.onTick(timeClock, System.currentTimeMillis()); - i++; - } - sortHandler.close(); - Assert.assertFalse(timeOrdering.isOrdered(unsortedList)); - Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get())); - Assert.assertTrue(mockCollector.get().size() > 0); - } - - @Test - public void testStreamSortHandlerWithUnsortedEventsIn1HourWindow() throws InterruptedException { - testWithUnsortedEventsIn1hWindow(1000000); - } - - @Test - public void testSortedInPatient() throws InterruptedException { - MockPartitionedCollector mockCollector = new MockPartitionedCollector(); - StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1"); - Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE); - StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl(); - sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector); - List<PartitionedEvent> sortedList = new LinkedList<>(); - - int i = 0; - while (i < 1000000) { - PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i); - sortHandler.nextEvent(event); - sortedList.add(event); - if (event.getTimestamp() > timeClock.getTime()) { - timeClock.moveForward(event.getTimestamp()); - } - sortHandler.onTick(timeClock, System.currentTimeMillis()); - i++; - } - sortHandler.close(); - Assert.assertTrue(timeOrdering.isOrdered(sortedList)); - Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get())); - Assert.assertEquals(1000000, mockCollector.get().size()); - } - - /** - * -XX:+PrintGC - * - * @throws InterruptedException - */ - @Test @Ignore("Igored heavy benchmark test in unit test") - public void testWithUnsortedEventsInLargeWindowBenchmark() throws InterruptedException { - metricReporter.report(); - testWithUnsortedEventsIn1hWindow(1000); - metricReporter.report(); - testWithUnsortedEventsIn1hWindow(10000); - metricReporter.report(); - testWithUnsortedEventsIn1hWindow(100000); - metricReporter.report(); - testWithUnsortedEventsIn1hWindow(1000000); - metricReporter.report(); - testWithUnsortedEventsIn1hWindow(10000000); - metricReporter.report(); - } - - public void testWithUnsortedEventsIn1hWindow(int count) throws InterruptedException { - MockPartitionedCollector mockCollector = new MockPartitionedCollector(); - StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1"); - Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE); - StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl(); - sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector); - List<PartitionedEvent> unsortedList = new LinkedList<>(); - - StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - int i = 0; - while (i < count) { - PartitionedEvent event = MockSampleMetadataFactory.createRandomOutOfTimeOrderEventGroupedByName("sampleStream_1"); - sortHandler.nextEvent(event); - unsortedList.add(event); - if (event.getEvent().getTimestamp() > timeClock.getTime()) { - timeClock.moveForward(event.getEvent().getTimestamp()); - } - sortHandler.onTick(timeClock, System.currentTimeMillis()); - i++; - } - stopWatch.stop(); - LOG.info("Produced {} events in {} ms", count, stopWatch.getTime()); - sortHandler.close(); - Assert.assertFalse(timeOrdering.isOrdered(unsortedList)); - Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get())); - Assert.assertTrue(mockCollector.get().size() >= 0); - } - - /** - * Used to debug window bucket lifecycle - * <p> - * Window period: PT1h, margin: 5s - * - * @throws InterruptedException - */ - @Test - public void testWithSortedEvents() throws InterruptedException { - MockPartitionedCollector mockCollector = new MockPartitionedCollector(); - StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1"); - Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE); - StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl(); - sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT1h", 5000), mockCollector); - List<PartitionedEvent> sortedList = new LinkedList<>(); - - int i = 0; - while (i < 1000000) { - PartitionedEvent event = MockSampleMetadataFactory.createRandomPartitionedEvent("sampleStream_1", System.currentTimeMillis() + i); - sortHandler.nextEvent(event); - sortedList.add(event); - if (event.getTimestamp() > timeClock.getTime()) { - timeClock.moveForward(event.getTimestamp()); - } - sortHandler.onTick(timeClock, System.currentTimeMillis()); - i++; - } - sortHandler.close(); - Assert.assertTrue(timeOrdering.isOrdered(sortedList)); - Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get())); - Assert.assertEquals(1000000, mockCollector.get().size()); - } - - /** - * Used to debug window bucket lifecycle - * <p> - * Window period: PT1h, margin: 5s - * - * @throws InterruptedException - */ - @Test - public void testWithSortedEventsAndExpireBySystemTime() throws InterruptedException { - MockPartitionedCollector mockCollector = new MockPartitionedCollector(); - StreamTimeClockInLocalMemory timeClock = new StreamTimeClockInLocalMemory("sampleStream_1"); - Ordering<PartitionedEvent> timeOrdering = Ordering.from(PartitionedEventTimeOrderingComparator.INSTANCE); - StreamSortWindowHandlerImpl sortHandler = new StreamSortWindowHandlerImpl(); - sortHandler.prepare("sampleStream_1", MockSampleMetadataFactory.createSampleStreamSortSpec("sampleStream_1", "PT10s", 1000), mockCollector); - List<PartitionedEvent> sortedList = new LinkedList<>(); - - PartitionedEvent event = MockSampleMetadataFactory.createRandomSortedEventGroupedByName("sampleStream_1"); - sortHandler.nextEvent(event); - sortedList.add(event); - timeClock.moveForward(event.getTimestamp()); - sortHandler.onTick(timeClock, System.currentTimeMillis()); - - // Triggered to become expired by System time - sortHandler.onTick(timeClock, System.currentTimeMillis() + 10 * 1000 + 1000L + 1); - - Assert.assertTrue(timeOrdering.isOrdered(sortedList)); - Assert.assertTrue(timeOrdering.isOrdered(mockCollector.get())); - Assert.assertEquals(1, mockCollector.get().size()); - - sortHandler.close(); - } - - // @Test - public void testWithTimerLock() throws InterruptedException { - Timer timer = new Timer(); - List<Long> collected = new ArrayList<>(); - timer.schedule(new TimerTask() { - @Override - public void run() { - synchronized (collected) { - LOG.info("Ticking {}", DateTimeUtil.millisecondsToHumanDateWithMilliseconds(System.currentTimeMillis())); - collected.add(System.currentTimeMillis()); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - }, 0, 100); - } -} \ No newline at end of file
