http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
deleted file mode 100644
index b151e99..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ /dev/null
@@ -1,64 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-{
-  "topology": {
-    "name": "alertUnitTopology_1",
-    "numOfTotalWorkers": 2,
-    "numOfSpoutTasks": 1,
-    "numOfRouterBolts": 4,
-    "numOfAlertBolts": 10,
-    "numOfPublishTasks": 1,
-    "messageTimeoutSecs": 3600,
-    "localMode": "true"
-  },
-  "spout": {
-    "kafkaBrokerZkQuorum": "server.eagle.apache.org:2181",
-    "kafkaBrokerZkBasePath": "/kafka",
-    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
-    "stormKafkaTransactionZkQuorum": "",
-    "stormKafkaTransactionZkPath": "/consumers",
-    "stormKafkaEagleConsumer": "eagle_consumer"
-  },
-  "zkConfig": {
-    "zkQuorum": "server.eagle.apache.org:2181",
-    "zkRoot": "/alert"
-  },
-  "metadataService": {
-    "context": "/rest",
-    "host": "localhost",
-    "port": 9090,
-    mailSmtpServer = "",
-    mailSmtpPort = 25,
-    mailSmtpAuth = "false"
-    //mailSmtpConn = "plaintext",
-    //mailSmtpUsername = ""
-    //mailSmtpPassword = ""
-    //mailSmtpDebug = false
-  },
-  "metric": {
-    "sink": {
-      // "kafka": {
-      //  "topic": "alert_metric"
-      //  "bootstrap.servers": "localhost:6667"
-      // }
-      //      "stdout": {}
-      //      "elasticsearch": {
-      //        "hosts": ["localhost:9200"]
-      //        "index": "alert_metric"
-      //        "timestampField": "timestamp"
-      //      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
deleted file mode 100644
index 16569a4..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/eagle.siddhiext
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-collect=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectAggregator
-collectWithDistinct=org.apache.eagle.alert.engine.siddhi.extension.AttributeCollectWithDistinctAggregator
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
deleted file mode 100644
index 47d476a..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootLogger=INFO, stdout
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: 
%m%n
-##log4j.logger.org.apache.eagle.alert.engine.spout.CorrelationSpout=DEBUG
-log4j.logger.org.apache.eagle.alert.metric=ERROR
-log4j.logger.org.apache.eagle.alert.engine.spout.SpoutOutputCollectorWrapper=DEBUG
-log4j.logger.org.apache.eagle.alert.engine.router.impl.StreamRouterBoltOutputCollector=DEBUG
-log4j.logger.org.apache.eagle.alert.engine.sorter.impl.StreamSortWindowHandlerImpl=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext
deleted file mode 100644
index 7569989..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/str.siddhiext
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
-coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
-concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
-length=org.wso2.siddhi.extension.string.LengthFunctionExtension
-lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
-regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
-repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
-replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
-replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
-reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
-strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
-substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
-trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
-upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
-hex=org.wso2.siddhi.extension.string.HexFunctionExtension
-unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-contains=org.wso2.siddhi.extension.string.ContainsFunctionExtension
-
-# Eagle Siddhi Extension
-equalsIgnoreCase=org.apache.eagle.alert.engine.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=org.apache.eagle.alert.engine.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=org.apache.eagle.alert.engine.siddhi.extension.RegexpIgnoreCaseFunctionExtension
-

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
deleted file mode 100644
index 24abae6..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceDriver.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.absence;
-
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceAlertDriver;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
-import org.junit.Test;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-/**
- * Since 7/8/16.
- */
-public class TestAbsenceDriver {
-    @Test
-    public void testAbsence() throws Exception {
-        // from 2PM to 3PM each day
-        AbsenceDailyRule rule = new AbsenceDailyRule();
-        rule.startOffset = 14 * 3600 * 1000;
-        rule.endOffset = 15 * 3600 * 1000;
-        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
-        List<Object> expectAttrs = Arrays.asList("host1");
-        AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, 
generator);
-
-        // first event came in 2016-07-08 11:20:00
-        String date = "2016-07-08 11:20:00";
-        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date d = df.parse(date);
-        long baseOccurTime = d.getTime();
-
-        // first event
-        driver.process(Arrays.asList("host2"), baseOccurTime);
-        // event after 1 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3600 * 1000);
-        // event after 2 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 2 * 3600 * 
1000);
-        // event after 3 hour, enter this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3 * 3600 * 
1000);
-        // event after 3.5 hour, still in this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3 * 3600 * 1000 
+ 1800 * 1000);
-        // event after 4 hour, exit this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 4 * 3600 * 
1000);
-    }
-
-    @Test
-    public void testOccurrence() throws Exception {
-        // from 2PM to 3PM each day
-        AbsenceDailyRule rule = new AbsenceDailyRule();
-        rule.startOffset = 14 * 3600 * 1000;
-        rule.endOffset = 15 * 3600 * 1000;
-        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
-        List<Object> expectAttrs = Arrays.asList("host1");
-        AbsenceAlertDriver driver = new AbsenceAlertDriver(expectAttrs, 
generator);
-
-        // first event came in 2016-07-08 11:20:00
-        String date = "2016-07-08 11:20:00";
-        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date d = df.parse(date);
-        long baseOccurTime = d.getTime();
-
-        // first event
-        driver.process(Arrays.asList("host2"), baseOccurTime);
-        // event after 1 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3600 * 1000);
-        // event after 2 hour
-        driver.process(Arrays.asList("host2"), baseOccurTime + 2 * 3600 * 
1000);
-        // event after 3 hour, enter this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 3 * 3600 * 
1000);
-        // event after 3.5 hour, still in this window
-        driver.process(Arrays.asList("host1"), baseOccurTime + 3 * 3600 * 1000 
+ 1800 * 1000);
-        // event after 4 hour, exit this window
-        driver.process(Arrays.asList("host2"), baseOccurTime + 4 * 3600 * 
1000);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
deleted file mode 100644
index 7bce34d..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsencePolicyHandler.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.absence;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsencePolicyHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Since 7/8/16.
- */
-public class TestAbsencePolicyHandler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(TestAbsencePolicyHandler.class);
-    private static final String inputStream = "testInputStream";
-    private static final String outputStream = "testOutputStream";
-
-    @Test
-    public void test() throws Exception {
-        test(buildPolicyDef_provided());
-    }
-
-    public void test(PolicyDefinition pd) throws Exception {
-        Map<String, StreamDefinition> sds = new HashMap<>();
-        StreamDefinition sd = buildStreamDef();
-        sds.put("testInputStream", sd);
-        AbsencePolicyHandler handler = new AbsencePolicyHandler(sds);
-
-        PolicyHandlerContext context = new PolicyHandlerContext();
-        context.setPolicyDefinition(pd);
-        handler.prepare(new TestCollector(), context);
-
-        handler.send(buildStreamEvt(0, "job1", "running"));
-    }
-
-    private static class TestCollector implements Collector {
-        @Override
-        public void emit(Object o) {
-            AlertStreamEvent e = (AlertStreamEvent) o;
-            Object[] data = e.getData();
-            Assert.assertEquals("host2", data[1]);
-            LOG.info(e.toString());
-        }
-    }
-
-    private PolicyDefinition buildPolicyDef_provided() {
-        PolicyDefinition pd = new PolicyDefinition();
-        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
-        def.setValue("1,jobID,job1,daily_rule,14:00:00,15:00:00");
-        def.setType("absencealert");
-        pd.setDefinition(def);
-        pd.setInputStreams(Arrays.asList(inputStream));
-        pd.setOutputStreams(Arrays.asList(outputStream));
-        pd.setName("absencealert-test");
-        return pd;
-    }
-
-    private StreamDefinition buildStreamDef() {
-        StreamDefinition sd = new StreamDefinition();
-        StreamColumn tsColumn = new StreamColumn();
-        tsColumn.setName("timestamp");
-        tsColumn.setType(StreamColumn.Type.LONG);
-
-        StreamColumn hostColumn = new StreamColumn();
-        hostColumn.setName("jobID");
-        hostColumn.setType(StreamColumn.Type.STRING);
-
-        StreamColumn valueColumn = new StreamColumn();
-        valueColumn.setName("status");
-        valueColumn.setType(StreamColumn.Type.STRING);
-
-        sd.setColumns(Arrays.asList(tsColumn, hostColumn, valueColumn));
-        sd.setDataSource("testDataSource");
-        sd.setStreamId("testStreamId");
-        return sd;
-    }
-
-    private StreamEvent buildStreamEvt(long ts, String jobID, String status) {
-        StreamEvent e = new StreamEvent();
-        e.setData(new Object[] {ts, jobID, status});
-        e.setStreamId(inputStream);
-        e.setTimestamp(ts);
-        return e;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
deleted file mode 100644
index 4eda2ed..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowGenerator.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.absence;
-
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceDailyRule;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowGenerator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-/**
- * Since 7/8/16.
- */
-public class TestAbsenceWindowGenerator {
-    @Test
-    public void testWindowInToday() throws Exception {
-        AbsenceDailyRule rule = new AbsenceDailyRule();
-        // from 2PM to 3PM each day
-        rule.startOffset = 14 * 3600 * 1000;
-        rule.endOffset = 15 * 3600 * 1000;
-        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
-
-        // get current time
-        String date = "2016-07-08 00:00:00";
-        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date d = df.parse(date);
-        long startTimeOfDay = d.getTime();
-
-        String currDate = "2016-07-08 11:30:29";
-        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        d = df.parse(currDate);
-        AbsenceWindow window = generator.nextWindow(d.getTime());
-        Assert.assertEquals(startTimeOfDay + rule.startOffset, 
window.startTime);
-    }
-
-    @Test
-    public void testWindowInTomorrow() throws Exception {
-        AbsenceDailyRule rule = new AbsenceDailyRule();
-        // from 2PM to 3PM each day
-        rule.startOffset = 14 * 3600 * 1000;
-        rule.endOffset = 15 * 3600 * 1000;
-        AbsenceWindowGenerator generator = new AbsenceWindowGenerator(rule);
-
-        // get current time
-        String date = "2016-07-08 00:00:00";
-        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        Date d = df.parse(date);
-        long startTimeOfDay = d.getTime();
-
-        String currDate = "2016-07-08 18:20:19";
-        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        df.setTimeZone(TimeZone.getTimeZone("UTC"));
-        d = df.parse(currDate);
-        AbsenceWindow window = generator.nextWindow(d.getTime());
-        // this needs adjustment for one day
-        Assert.assertEquals(startTimeOfDay + rule.startOffset + 
AbsenceDailyRule.DAY_MILLI_SECONDS, window.startTime);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
deleted file mode 100644
index 32a614b..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/absence/TestAbsenceWindowProcessor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.absence;
-
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindow;
-import org.apache.eagle.alert.engine.evaluator.absence.AbsenceWindowProcessor;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Since 7/6/16.
- */
-public class TestAbsenceWindowProcessor {
-    @Test
-    public void testDataMissing() {
-        List<Object> expectedHosts = Arrays.asList("host1");
-        AbsenceWindow window = new AbsenceWindow();
-        window.startTime = 100L;
-        window.endTime = 200L;
-        AbsenceWindowProcessor processor = new 
AbsenceWindowProcessor(expectedHosts, window);
-        processor.process(Arrays.asList("host2"), 90);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.not_sure);
-        processor.process(Arrays.asList("host3"), 101);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.not_sure);
-        processor.process(Arrays.asList("host3"), 138);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.not_sure);
-        processor.process(Arrays.asList("host2"), 189);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.not_sure);
-        processor.process(Arrays.asList("host2"), 201);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.absent);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testDataExists() {
-        List<Object> expectedHosts = Arrays.asList("host1");
-        AbsenceWindow window = new AbsenceWindow();
-        window.startTime = 100L;
-        window.endTime = 200L;
-        AbsenceWindowProcessor processor = new 
AbsenceWindowProcessor(expectedHosts, window);
-        processor.process(Arrays.asList("host2"), 90);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.not_sure);
-        processor.process(Arrays.asList("host3"), 101);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.not_sure);
-        processor.process(Arrays.asList("host1"), 138);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.occured);
-        processor.process(Arrays.asList("host2"), 189);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.occured);
-        processor.process(Arrays.asList("host2"), 201);
-        Assert.assertEquals(processor.checkStatus(), 
AbsenceWindowProcessor.OccurStatus.occured);
-        Assert.assertEquals(processor.checkExpired(), true);
-        processor.process(Arrays.asList("host2"), 225);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
deleted file mode 100644
index 3f5fc67..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/CoordinatorClient.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.MediaType;
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * @since May 9, 2016
- */
-public class CoordinatorClient implements Closeable {
-
-    @SuppressWarnings("unused")
-    private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorClient.class);
-
-    private static final String EAGLE_COORDINATOR_SERVICE_CONTEXT = 
"metadataService.context";
-    private static final String EAGLE_COORDINATOR_SERVICE_PORT = 
"metadataService.port";
-    private static final String EAGLE_COORDINATOR_SERVICE_HOST = 
"metadataService.host";
-    private static final String COORDINATOR_SCHEDULE_API = 
"/coordinator/build";
-
-    private String host;
-    private int port;
-    private String context;
-    private transient Client client;
-    private String basePath;
-
-    public CoordinatorClient(Config config) {
-        this(config.getString(EAGLE_COORDINATOR_SERVICE_HOST), 
config.getInt(EAGLE_COORDINATOR_SERVICE_PORT), config
-            .getString(EAGLE_COORDINATOR_SERVICE_CONTEXT));
-        basePath = buildBasePath();
-    }
-
-    public CoordinatorClient(String host, int port, String context) {
-        this.host = host;
-        this.port = port;
-        this.context = context;
-        this.basePath = buildBasePath();
-        ClientConfig cc = new DefaultClientConfig();
-        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 
60 * 1000);
-        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 
1000);
-        cc.getClasses().add(JacksonJsonProvider.class);
-        
cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND,
 true);
-        this.client = Client.create(cc);
-        client.addFilter(new 
com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
-    }
-
-    private String buildBasePath() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("http://";);
-        sb.append(host);
-        sb.append(":");
-        sb.append(port);
-        sb.append(context);
-        return sb.toString();
-    }
-
-    public String schedule() {
-        WebResource r = client.resource(basePath + COORDINATOR_SCHEDULE_API);
-        return 
r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(String.class);
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.client.destroy();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
deleted file mode 100644
index 01e50a4..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.UnitTopologyMain;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-
-/**
- * Case of simple
- *
- * @since May 8, 2016
- */
-public class Integration1 {
-    private static final String SIMPLE_CONFIG = 
"/simple/application-integration.conf";
-    private static final Logger LOG = 
LoggerFactory.getLogger(Integration1.class);
-    private static final ObjectMapper om = new ObjectMapper();
-
-    public static void main(String[] args) throws Exception {
-        Integration1 inte = new Integration1();
-        inte.args = args;
-        inte.test_simple_threshhold();
-    }
-
-    private String[] args;
-    private ExecutorService executors = Executors.newFixedThreadPool(5, new 
ThreadFactory() {
-
-        @Override
-        public Thread newThread(Runnable r) {
-            Thread t = new Thread(r);
-            t.setDaemon(true);
-            return t;
-        }
-    });
-    private static KafkaEmbedded kafka;
-
-    @BeforeClass
-    public static void setup() {
-//        kafka = new KafkaEmbedded(9092, 2181);
-//        makeSureTopic("perfmon_metrics");
-    }
-
-    @AfterClass
-    public static void end() {
-        if (kafka != null) {
-            kafka.shutdown();
-        }
-    }
-
-    /**
-     * Assumption:
-     * <p>
-     * start metadata service 8080 /rest
-     * <p>
-     * datasources : perfmon_datasource
-     * <p>
-     * stream: perfmon_cpu
-     * <p>
-     * policy : perfmon_cpu_host_check / perfmon_cpu_pool_check
-     * <p>
-     * Create topic
-     * liasu@xxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1 --topic perfmon_metrics
-     * <p>
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void test_simple_threshhold() throws Exception {
-        System.setProperty("config.resource", SIMPLE_CONFIG);
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-
-        System.out.println("loading metadatas...");
-        loadMetadatas("/simple/", config);
-        System.out.println("loading metadatas done!");
-
-        if (args == null) {
-            args = new String[] {"-c", "simple/application-integration.conf"};
-        }
-
-        executors.submit(() -> SampleClient1.main(args));
-
-        executors.submit(() -> {
-            try {
-                UnitTopologyMain.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-        Utils.sleep(1000 * 5l);
-        while (true) {
-            proactive_schedule(config);
-
-            Utils.sleep(1000 * 60l * 5);
-        }
-    }
-
-    public static void proactive_schedule(Config config) throws Exception {
-        try (CoordinatorClient cc = new CoordinatorClient(config)) {
-            try {
-                String resp = cc.schedule();
-                LOG.info("schedule return : {} ", resp);
-            } catch (Exception e) {
-                LOG.error("failed to call schedule!", e);
-            }
-        }
-    }
-
-    public static void loadMetadatas(String base, Config config) throws 
Exception {
-        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
-        client.clear();
-
-        List<Kafka2TupleMetadata> metadata = loadEntities(base + 
"datasources.json", Kafka2TupleMetadata.class);
-        for (Kafka2TupleMetadata k : metadata) {
-            client.addDataSource(k);
-        }
-
-        List<PolicyDefinition> policies = loadEntities(base + "policies.json", 
PolicyDefinition.class);
-        for (PolicyDefinition p : policies) {
-            client.addPolicy(p);
-        }
-
-        List<Publishment> pubs = loadEntities(base + "publishments.json", 
Publishment.class);
-        for (Publishment pub : pubs) {
-            client.addPublishment(pub);
-        }
-
-        List<StreamDefinition> defs = loadEntities(base + 
"streamdefinitions.json", StreamDefinition.class);
-        for (StreamDefinition def : defs) {
-            client.addStreamDefinition(def);
-        }
-
-        List<Topology> topos = loadEntities(base + "topologies.json", 
Topology.class);
-        for (Topology t : topos) {
-            client.addTopology(t);
-        }
-
-        client.close();
-    }
-
-    public static <T> List<T> loadEntities(String path, Class<T> tClz) throws 
Exception {
-        JavaType type = CollectionType.construct(List.class, 
SimpleType.construct(tClz));
-        List<T> l = om.readValue(Integration1.class.getResourceAsStream(path), 
type);
-        return l;
-    }
-
-    /**
-     * <p>
-     * {"name":"xxx","numOfSpout":1,"numOfAlertBolt":3,"numOfGroupBolt":2,
-     * "spoutId"
-     * :"xxx-spout","groupNodeIds":["xxx-grp"],"alertBoltIds":["xxx-bolt"
-     * ],"pubBoltId":"xxx-pubBolt","spoutParallelism":1,"groupParallelism":1,
-     * "alertParallelism":1}
-     * <p>
-     *
-     * @throws Exception
-     */
-    @Ignore
-    @Test
-    public void testJson() throws Exception {
-        {
-            JavaType type = CollectionType.construct(List.class, 
SimpleType.construct(Topology.class));
-            List<Topology> l = 
om.readValue(Integration1.class.getResourceAsStream("/simple/topologies.json"),
-                type);
-            Topology t = (Topology) l.get(0);
-
-            Assert.assertEquals(4, t.getGroupNodeIds().size());
-            Assert.assertEquals(10, t.getAlertBoltIds().size());
-        }
-
-        {
-            JavaType type = CollectionType.construct(List.class, 
SimpleType.construct(Publishment.class));
-            // publishment
-            List<Publishment> l = 
om.readValue(Integration1.class.getResourceAsStream("/simple/publishments.json"),
 type);
-            Publishment p = l.get(0);
-            Assert.assertEquals("KAFKA", p.getType());
-        }
-
-        checkAll("/simple/");
-        checkAll("/correlation/");
-    }
-
-    public static void checkAll(String base) throws Exception {
-        loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
-        loadEntities(base + "policies.json", PolicyDefinition.class);
-        loadEntities(base + "publishments.json", Publishment.class);
-        loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
-        loadEntities(base + "topologies.json", Topology.class);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
deleted file mode 100644
index 5c2c404..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration2.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.UnitTopologyMain;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @since May 10, 2016
- */
-public class Integration2 {
-
-    private ExecutorService executors = Executors.newFixedThreadPool(5);
-
-    /**
-     * <pre>
-     * Create topic
-     * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1 --topic eslogs
-     * liasu@xxxx:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1 --topic bootfailures
-     * </pre>
-     *
-     * @param args
-     */
-    public static void main(String[] args) throws Exception {
-        Integration2 inte = new Integration2();
-        inte.args = args;
-        inte.test_start();
-    }
-
-    private String[] args;
-
-    @Ignore
-    @Test
-    public void test_start() throws Exception {
-        System.setProperty("config.resource", 
"/correlation/application-integration-2.conf");
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-        Integration1.loadMetadatas("/correlation/", config);
-
-        executors.submit(() -> {
-            try {
-                UnitTopologyMain.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-        executors.submit(() -> SampleClient2.main(args));
-
-        Utils.sleep(1000 * 5l);
-        while (true) {
-            Integration1.proactive_schedule(config);
-            Utils.sleep(1000 * 60l * 5);
-        }
-    }
-
-    @Test
-    @Ignore
-    public void test3() throws Exception {
-        SiddhiManager sm = new SiddhiManager();
-        String s1 = " define stream esStream(instanceUuid string, timestamp 
long, logLevel string, message string, reqId string, host string, component 
string); ";
-        s1 += " define stream ifStream(instanceUuid string, timestamp long, 
reqId string, message string, host string); ";
-        s1 += "from esStream#window.externalTime(timestamp, 20 min) as a join 
ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == 
b.instanceUuid  within 10 min select logLevel, a.host as aHost, a.component, 
a.message as logMessage, b.message as failMessage, a.timestamp as t1, 
b.timestamp as t2, b.host as bHost, count(1) as errorCount group by component 
insert into log_stream_join_output; ";
-        ExecutionPlanRuntime epr = sm.createExecutionPlanRuntime(s1);
-
-        epr.addCallback("log_stream_join_output", new StreamCallback() {
-            @Override
-            public void receive(Event[] arg0) {
-                System.out.println("join result!");
-                EventPrinter.print(arg0);
-            }
-        });
-
-        InputHandler input1 = epr.getInputHandler("esStream");
-        InputHandler input2 = epr.getInputHandler("ifStream");
-
-        epr.start();
-
-        long base = 1462880695837l;
-
-        while (true) {
-            sendEvent(input1, input2, base);
-
-            base = base + 3000;
-
-            Utils.sleep(3000);
-        }
-
-    }
-
-    private void sendEvent(InputHandler input1, InputHandler input2, long 
base) throws InterruptedException {
-        {
-            Event e = new Event();
-            e.setTimestamp(base);
-            e.setData(new Object[] {
-                "instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
-                base,
-                "ERROR",
-                "NullPointException",
-                "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
-                "nova.host",
-                "NOVA"
-            });
-            input1.send(e);
-        }
-
-        {
-            Event e = new Event();
-            e.setTimestamp(base);
-            e.setData(new Object[] 
{"instance-guid-c2a1c926-b590-418e-bf57-41469d7891fa",
-                base,
-                "req-id-82dab92c-9e45-4ad8-8793-96e912831f00",
-                "boot failure for when try start the given vm!",
-                "boot-vm-data-center.corp.com"});
-            input2.send(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
deleted file mode 100644
index 41dbe97..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.UnitTopologyMain;
-import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @since May 10, 2016
- */
-public class Integration3 {
-
-    private String[] args;
-    private ExecutorService executors = Executors.newFixedThreadPool(5);
-    private static KafkaEmbedded kafka;
-
-    @BeforeClass
-    public static void setup() {
-        // FIXME : start local kafka
-    }
-
-    @AfterClass
-    public static void end() {
-        if (kafka != null) {
-            kafka.shutdown();
-        }
-    }
-
-    /**
-     * Assumption:
-     * <p>
-     * start metadata service 8080 /rest
-     * <p>
-     * <pre>
-     * user@kafka-host:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 
localhost:2181 --replication-factor 1 --partitions 1 --topic syslog_events
-     * </pre>
-     * <p>
-     *
-     * @throws InterruptedException
-     */
-    @Test
-    public void testSeverity() throws Exception {
-        System.setProperty("config.resource", "/e2e/application-e2e.conf");
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-
-        System.out.println("loading metadatas...");
-        Integration1.loadMetadatas("/e2e/", config);
-        System.out.println("loading metadatas done!");
-
-        // send sample sherlock data
-        executors.submit(() -> {
-            try {
-                SampleClient3.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-        executors.submit(() -> {
-            try {
-                UnitTopologyMain.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-        Utils.sleep(1000 * 5l);
-        while (true) {
-            Integration1.proactive_schedule(config);
-
-            Utils.sleep(1000 * 60l * 5);
-        }
-    }
-
-    @Test
-    public void testJson() throws Exception {
-        Integration1.checkAll("/e2e/");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
deleted file mode 100644
index 59af759..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration4NoDataAlert.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.UnitTopologyMain;
-import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Since 6/29/16.
- */
-public class Integration4NoDataAlert {
-    private String[] args;
-
-    private ExecutorService executors = Executors.newFixedThreadPool(5);
-
-    private static KafkaEmbedded kafka;
-
-    @BeforeClass
-    public static void setup() {
-        // FIXME : start local kafka
-    }
-
-    @AfterClass
-    public static void end() {
-        if (kafka != null) {
-            kafka.shutdown();
-        }
-    }
-
-    @Test
-    public void testTriggerNoData() throws Exception {
-        System.setProperty("config.resource", 
"/nodata/application-nodata.conf");
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-
-        System.out.println("loading metadatas...");
-        Integration1.loadMetadatas("/nodata/", config);
-        System.out.println("loading metadatas done!");
-
-
-        executors.submit(() -> {
-            try {
-                UnitTopologyMain.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-        // wait 20 seconds for topology to bring up
-        try {
-            Thread.sleep(20000);
-        } catch (Exception ex) {
-        }
-
-        // send mock data
-        executors.submit(() -> {
-            try {
-                SampleClient4NoDataAlert.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-
-        Utils.sleep(1000 * 5l);
-        while (true) {
-            Integration1.proactive_schedule(config);
-
-            Utils.sleep(1000 * 60l * 5);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
deleted file mode 100644
index e656c31..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.engine.UnitTopologyMain;
-import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Since 6/29/16.
- */
-public class Integration5AbsenceAlert {
-    private String[] args;
-
-    private ExecutorService executors = Executors.newFixedThreadPool(5);
-
-    private static KafkaEmbedded kafka;
-
-    @BeforeClass
-    public static void setup() {
-        // FIXME : start local kafka
-    }
-
-    @AfterClass
-    public static void end() {
-        if (kafka != null) {
-            kafka.shutdown();
-        }
-    }
-
-    @Test
-    public void testTriggerAbsenceAlert() throws Exception {
-        System.setProperty("config.resource", 
"/absence/application-absence.conf");
-        ConfigFactory.invalidateCaches();
-        Config config = ConfigFactory.load();
-
-        System.out.println("loading metadatas...");
-        Integration1.loadMetadatas("/absence/", config);
-        System.out.println("loading metadatas done!");
-
-
-        executors.submit(() -> {
-            try {
-                UnitTopologyMain.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-        // wait 20 seconds for topology to bring up
-        try {
-            Thread.sleep(20000);
-        } catch (Exception ex) {
-        }
-
-        // send mock data
-        executors.submit(() -> {
-            try {
-                SampleClient5AbsenceAlert.main(args);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-
-        Utils.sleep(1000 * 5l);
-        while (true) {
-            Integration1.proactive_schedule(config);
-
-            Utils.sleep(1000 * 60l * 5);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
deleted file mode 100644
index 86de439..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/MetadataServiceClientImpTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.e2e;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-public class MetadataServiceClientImpTest {
-
-    @Test
-    @Ignore
-    public void test() {
-        System.out.println("loading metadatas...");
-        try {
-            System.setProperty("config.resource", 
"/application-integration.conf");
-            ConfigFactory.invalidateCaches();
-            Config config = ConfigFactory.load();
-            loadMetadatas("/", config);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-        System.out.println("loading metadatas done!");
-    }
-
-    private void loadMetadatas(String base, Config config) throws Exception {
-        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
-        client.clear();
-
-        List<Kafka2TupleMetadata> metadata = Integration1.loadEntities(base + 
"datasources.json", Kafka2TupleMetadata.class);
-        client.addDataSources(metadata);
-
-        List<PolicyDefinition> policies = Integration1.loadEntities(base + 
"policies.json", PolicyDefinition.class);
-        client.addPolicies(policies);
-
-        List<Publishment> pubs = Integration1.loadEntities(base + 
"publishments.json", Publishment.class);
-        client.addPublishments(pubs);
-
-        List<StreamDefinition> defs = Integration1.loadEntities(base + 
"streamdefinitions.json", StreamDefinition.class);
-        client.addStreamDefinitions(defs);
-
-        List<Topology> topos = Integration1.loadEntities(base + 
"topologies.json", Topology.class);
-        client.addTopologies(topos);
-
-        client.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
deleted file mode 100644
index acb8ff8..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient1.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @since May 9, 2016
- */
-public class SampleClient1 {
-    @SuppressWarnings("unused")
-    private static final Logger LOG = 
LoggerFactory.getLogger(SampleClient1.class);
-
-    private static final String PERFMON_CPU_STREAM = "perfmon_cpu_stream";
-    private static final String PERFMON_MEM_STREAM = "perfmon_mem_stream";
-
-    //    private static int hostIndx = 1;
-    private static String hostTemp = "host-000%d.datacenter.corp.com";
-
-    /**
-     * <pre>
-     * {"host": "", "timestamp" : "", "metric" : "", "pool": "", "value": 1.0, 
"colo": "phx"}
-     * </pre>
-     */
-    public static class Entity {
-        public String host;
-        public long timestamp;
-        public String metric;
-        public String pool;
-        public double value;
-        public String colo;
-    }
-
-    public static void main(String[] args) {
-        long base = System.currentTimeMillis();
-        AtomicLong msgCount = new AtomicLong();
-
-        Config config = ConfigFactory.load();
-        try (KafkaProducer<String, String> proceduer = 
createProceduer(config)) {
-            while (true) {
-                int hostIndex = 6;
-                for (int i = 0; i < hostIndex; i++) {
-                    base = send_metric(base, proceduer, PERFMON_CPU_STREAM, i);
-                    msgCount.incrementAndGet();
-                    base = send_metric(base, proceduer, PERFMON_MEM_STREAM, i);
-                    msgCount.incrementAndGet();
-                }
-
-                if ((msgCount.get() % 600) == 0) {
-                    System.out.println("send 600 CPU/MEM metric!");
-                }
-
-                Utils.sleep(3000);
-            }
-        }
-    }
-
-    private static long send_metric(long base, KafkaProducer<String, String> 
proceduer, String stream, int hostIndex) {
-
-        Pair<Long, String> pair = createEntity(base, stream, hostIndex);
-        base = pair.getKey();
-        ProducerRecord<String, String> record = new ProducerRecord<String, 
String>("perfmon_metrics",
-            pair.getRight());
-        proceduer.send(record);
-        return base;
-    }
-
-    private static Pair<Long, String> createEntity(long base, String stream, 
int hostIndex) {
-        // TODO : add randomization
-        Entity e = new Entity();
-        e.colo = "LVS";
-        e.host = String.format(hostTemp, hostIndex);
-        if (hostIndex < 3) {
-            e.pool = "hadoop-eagle-prod";
-        } else {
-            e.pool = "raptor-pool1";
-        }
-        e.timestamp = base;
-        e.metric = stream;
-        e.value = 92.0;
-
-        base = base + 1000;
-
-        return Pair.of(base, JsonUtils.writeValueAsString(e));
-    }
-
-    public static KafkaProducer<String, String> createProceduer(Config config) 
{
-        String servers = config.getString("kafkaProducer.bootstrapServers");
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", servers);
-        // configMap.put("metadata.broker.list", broker_list);
-        configMap.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-        KafkaProducer<String, String> proceduer = new KafkaProducer<String, 
String>(configMap);
-        return proceduer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
deleted file mode 100644
index 8a2a639..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient2.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @since May 10, 2016
- */
-public class SampleClient2 {
-
-//    private static final Logger LOG = 
LoggerFactory.getLogger(SampleClient2.class);
-
-    public static class LogEntity {
-        public String instanceUuid;
-        public long timestamp;
-        public String logLevel;
-        public String message;
-        public String reqId;
-        public String host;
-        public String component;
-    }
-
-    public static class IfEntity {
-        public String instanceUuid;
-        public long timestamp;
-        public String reqId;
-        public String message;
-        public String host;
-    }
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-        AtomicLong base1 = new AtomicLong(System.currentTimeMillis());
-        AtomicLong base2 = new AtomicLong(System.currentTimeMillis());
-        AtomicLong count = new AtomicLong();
-
-        Config config = ConfigFactory.load();
-
-        try (KafkaProducer<String, String> proceduer = 
SampleClient1.createProceduer(config)) {
-            while (true) {
-                nextUuid = String.format(instanceUuidTemp, 
UUID.randomUUID().toString());
-                nextReqId = String.format(reqIdTemp, 
UUID.randomUUID().toString());
-
-                int hostIndex = 6;
-                for (int i = 0; i < hostIndex; i++) {
-                    sendMetric(base1, base2, count, proceduer, i);
-                }
-
-                if (count.get() % 600 == 0) {
-                    System.out.println("send 600 LOG/FAILURE metric!");
-                }
-
-                Utils.sleep(3000);
-
-            }
-        }
-    }
-
-    private static void sendMetric(AtomicLong base1, AtomicLong base2, 
AtomicLong count,
-                                   KafkaProducer<String, String> proceduer, 
int i) {
-        {
-            Pair<Long, String> pair = createLogEntity(base1, i);
-            ProducerRecord<String, String> logRecord = new 
ProducerRecord<>("eslogs", pair.getRight());
-            proceduer.send(logRecord);
-            count.incrementAndGet();
-        }
-        {
-            Pair<Long, String> pair2 = createFailureEntity(base2, i);
-            ProducerRecord<String, String> failureRecord = new 
ProducerRecord<>("bootfailures", pair2.getRight());
-            proceduer.send(failureRecord);
-            count.incrementAndGet();
-        }
-    }
-
-    private static String instanceUuidTemp = "instance-guid-%s";
-    private static String reqIdTemp = "req-id-%s";
-    private static String nextUuid;
-    private static String nextReqId;
-
-    private static Pair<Long, String> createLogEntity(AtomicLong base1, int 
hostIndex) {
-        // TODO: add randomization
-        LogEntity le = new LogEntity();
-        if (hostIndex < 3) {
-            le.component = "NOVA";
-            le.host = "nova.000-" + hostIndex + ".datacenter.corp.com";
-            le.message = "RabbitMQ Exception - MQ not connectable!";
-        } else {
-            le.component = "NEUTRON";
-            le.host = "neturon.000-" + (hostIndex - 3) + 
".datacenter.corp.com";
-            le.message = "DNS Exception - Fail to connect to DNS!";
-        }
-        le.instanceUuid = nextUuid;
-        le.logLevel = "ERROR";
-        le.reqId = nextReqId;
-        le.timestamp = base1.get();
-
-        base1.addAndGet(1000);// simply some interval.
-        return Pair.of(base1.get(), JsonUtils.writeValueAsString(le));
-    }
-
-    private static Pair<Long, String> createFailureEntity(AtomicLong base, int 
hostIndex) {
-        // TODO: add randomization
-        IfEntity ie = new IfEntity();
-        ie.host = "boot-vm-0-" + hostIndex + ".datacenter.corp.com";
-        ie.instanceUuid = nextUuid;
-        ie.message = "boot failure for when try start the given vm!";
-        ie.reqId = nextReqId;
-        ie.timestamp = base.get();
-
-        base.addAndGet(2000);// simply some interval.
-        return Pair.of(base.get(), JsonUtils.writeValueAsString(ie));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
deleted file mode 100644
index 80fac3d..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient3.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.kafka.clients.producer.KafkaProducer;
-
-import java.util.Properties;
-
-/**
- * @since Jun 12, 2016
- */
-public class SampleClient3 {
-
-
-    @SuppressWarnings( {"unchecked", "rawtypes"})
-    public static void main(String[] args) throws Exception {
-        System.setProperty("config.resource", "/e2e/application-e2e.conf");
-        ConfigFactory.invalidateCaches();
-
-        Config config = ConfigFactory.load();
-        KafkaProducer producer = createByteProceduer(config);
-
-//        TimeSeriesDataSchemaManager manager = TimeSeriesDataSchemaUtils
-//                
.createManagerFromCGMJSONStream(SherlockEventScheme.class.getResourceAsStream("/e2e/sherlock.json"));
-//        Serializer serializer = ProtobufSerializer.newInstance(manager);
-//
-//        while (true) {
-//            SherlockEvent event = createEvent(manager);
-//            ProducerRecord record = new ProducerRecord("syslog_events", 
serializer.writeValueAsBytes(event));
-//            producer.send(record);
-//
-//            Utils.sleep(3000);
-//        }
-    }
-
-    //    private static SherlockEvent createEvent(TimeSeriesDataSchemaManager 
manager) throws Exception {
-//
-//        SherlockEventBuilder builder = SherlockEvent.newBuilder();
-//        builder.setEpochMillis(System.currentTimeMillis());
-//        builder.setSchema(manager.getSchema("syslog", "parsed"));
-//        // dim
-//        DimTagsBuilder dimBuilder = DimTags.newBuilder();
-//        dimBuilder.add("facility", "USER");
-//        dimBuilder.add("severity", "NOTICE");
-//        dimBuilder.add("hostname", "LM-SJC-11000548");
-//        dimBuilder.add("msgid", "-");
-//        builder.setDimTagsBuilder(dimBuilder);
-//
-//        MapdataValueBuilder mvBuilder = MapdataValue.newBuilder();
-//        mvBuilder.put("timestamp", 
StringValue.newBuilder().setValue("04/Dec/2015:11:54:23 -0700").build());
-//        mvBuilder.put("conn", 
StringValue.newBuilder().setValue("293578221").build());
-//        mvBuilder.put("op", StringValue.newBuilder().setValue("1").build());
-//        mvBuilder.put("msgId", 
StringValue.newBuilder().setValue("2").build());
-//        mvBuilder.put(
-//                "command",
-//                StringValue.newBuilder()
-//                        .setValue("RESULT err=0 tag=101 nentries=1 etime=0 
additional alert line ha ha ha").build());
-//
-//        builder.setValueBuilder(mvBuilder);
-//
-//        return builder.build();
-//    }
-//    
-    public static KafkaProducer<String, String> createByteProceduer(Config 
config) {
-        String servers = config.getString("kafkaProducer.bootstrapServers");
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", servers);
-        configMap.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        KafkaProducer<String, String> proceduer = new KafkaProducer<String, 
String>(configMap);
-        return proceduer;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6fd95d5c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
deleted file mode 100644
index 2123571..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/SampleClient4NoDataAlert.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.e2e;
-
-import backtype.storm.utils.Utils;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * Since 6/29/16.
- */
-@SuppressWarnings( {"rawtypes", "unchecked"})
-public class SampleClient4NoDataAlert {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SampleClient4NoDataAlert.class);
-    private static long currentTimestamp = 1467240000000L;
-    private static long interval = 3000L;
-    private static volatile boolean host1Muted = false;
-    private static volatile boolean host2Muted = false;
-
-    public static void main(String[] args) throws Exception {
-        System.setProperty("config.resource", 
"/nodata/application-nodata.conf");
-        ConfigFactory.invalidateCaches();
-
-        Config config = ConfigFactory.load();
-        KafkaProducer producer = createProducer(config);
-        ProducerRecord record = null;
-        Thread x = new MuteThread();
-        x.start();
-        while (true) {
-            if (!host1Muted) {
-                record = new ProducerRecord("noDataAlertTopic", 
createEvent("host1"));
-                producer.send(record);
-            }
-            if (!host2Muted) {
-                record = new ProducerRecord("noDataAlertTopic", 
createEvent("host2"));
-                producer.send(record);
-            }
-            record = new ProducerRecord("noDataAlertTopic", 
createEvent("host3"));
-            producer.send(record);
-            Utils.sleep(interval);
-            currentTimestamp += interval;
-        }
-    }
-
-    private static class MuteThread extends Thread {
-        @Override
-        public void run() {
-            try {
-                // sleep 10 seconds
-                Thread.sleep(10000);
-                // mute host1
-                LOG.info("mute host1");
-                host1Muted = true;
-                // sleep 70 seconds for triggering no data alert
-                LOG.info("try to sleep 70 seconds for triggering no data 
alert");
-                Thread.sleep(70000);
-                // unmute host1
-                LOG.info("unmute host1");
-                host1Muted = false;
-                Thread.sleep(10000);
-                // mute host2
-                LOG.info("mute host2");
-                host2Muted = true;
-                // sleep 70 seconds for triggering no data alert
-                LOG.info("try to sleep 70 seconds for triggering no data 
alert");
-                Thread.sleep(70000);
-                LOG.info("unmute host2");
-                host2Muted = false;
-            } catch (Exception ex) {
-                ex.printStackTrace();
-            }
-        }
-    }
-
-    private static class NoDataEvent {
-        @JsonProperty
-        long timestamp;
-        @JsonProperty
-        String host;
-        @JsonProperty
-        double value;
-
-        public String toString() {
-            return "timestamp=" + timestamp + ",host=" + host + ",value=" + 
value;
-        }
-    }
-
-    private static String createEvent(String host) throws Exception {
-        NoDataEvent e = new NoDataEvent();
-        long expectTS = currentTimestamp + interval;
-        // adjust back 1 second random
-        long adjust = Math.round(2 * Math.random());
-        e.timestamp = expectTS - adjust;
-        e.host = host;
-        e.value = 25.6;
-        LOG.info("sending event {} ", e);
-        ObjectMapper mapper = new ObjectMapper();
-        String value = mapper.writeValueAsString(e);
-        return value;
-    }
-
-
-    public static KafkaProducer<String, String> createProducer(Config config) {
-        String servers = config.getString("kafkaProducer.bootstrapServers");
-        Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", servers);
-        configMap.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-        configMap.put("request.required.acks", "1");
-        configMap.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-        configMap.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
-        KafkaProducer<String, String> proceduer = new KafkaProducer<String, 
String>(configMap);
-        return proceduer;
-    }
-}

Reply via email to