http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
deleted file mode 100644
index 25e5cff..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.config.DeduplicatorConfig;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import scala.Tuple2;
-
-public abstract class AlertDeduplicationExecutorBase extends 
JavaStormStreamExecutor2<String, AlertAPIEntity> implements 
PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
-       private static final long serialVersionUID = 1L;
-       private static final Logger LOG = 
LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
-       protected Config config;
-       protected DEDUP_TYPE dedupType;
-
-       private List<String> alertExecutorIdList;
-       private volatile CopyOnWriteHashMap<String, DefaultDeduplicator> 
alertDedups;
-       private PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao;
-
-       public enum DEDUP_TYPE {
-               ENTITY,
-               EMAIL
-       }
-
-       public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, 
DEDUP_TYPE dedupType, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
-               this.alertExecutorIdList = alertExecutorIdList;
-               this.dedupType = dedupType;
-               this.dao = dao;
-       }
-       
-       @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-       
-       public DefaultDeduplicator createAlertDedup(AlertDefinitionAPIEntity 
alertDef) {
-               DeduplicatorConfig dedupConfig = null;
-               try {
-                       dedupConfig = 
JsonSerDeserUtils.deserialize(alertDef.getDedupeDef(), 
DeduplicatorConfig.class);
-               }
-               catch (Exception ex) {
-                       LOG.warn("Initial dedup Config error, " + 
ex.getMessage());
-               }
-
-        if (dedupConfig != null) {
-                       return new 
DefaultDeduplicator(dedupConfig.getAlertDedupIntervalMin(), 
dedupConfig.getFields());
-               }
-
-               return null;
-       }
-       
-       @Override
-       public void init() {            
-        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.SITE);
-        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.APPLICATION);
-           Map<String, Map<String, AlertDefinitionAPIEntity>> 
initialAlertDefs;                    
-           try {
-                       initialAlertDefs = 
dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
-           }
-           catch (Exception ex) {
-                       LOG.error("fail to initialize initialAlertDefs: ", ex);
-               throw new IllegalStateException("fail to initialize 
initialAlertDefs: ", ex);
-        }
-           Map<String, DefaultDeduplicator> tmpDeduplicators = new HashMap<>();
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions was found for site: "+site+", 
dataSource: "+dataSource);
-        } else {
-                   for (String alertExecutorId: alertExecutorIdList) {
-                           if(initialAlertDefs.containsKey(alertExecutorId)){
-                    for(AlertDefinitionAPIEntity alertDef : 
initialAlertDefs.get(alertExecutorId).values()){
-                       try {
-                          DefaultDeduplicator deduplicator = 
createAlertDedup(alertDef);
-                          if (deduplicator != null)
-                              
tmpDeduplicators.put(alertDef.getTags().get(Constants.POLICY_ID), deduplicator);
-                          else LOG.warn("The dedup interval is not set, 
alertDef: " + alertDef);
-                        }
-                        catch (Throwable t) {
-                            LOG.error("Got an exception when initial dedup 
config, probably dedup config is not set: " + t.getMessage() + "," + alertDef);
-                        }
-                    }
-                } else {
-                    LOG.info(String.format("No alert definitions found for 
site: %s, dataSource: %s, alertExecutorId: 
%s",site,dataSource,alertExecutorId));
-                }
-                   }
-        }
-
-               alertDedups = new CopyOnWriteHashMap<>();
-               alertDedups.putAll(tmpDeduplicators);
-               DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = 
DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
-               policyLoader.init(initialAlertDefs, dao, config);
-               for (String alertExecutorId : alertExecutorIdList) {
-                       policyLoader.addPolicyChangeListener(alertExecutorId, 
this);
-               }
-       }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
AlertAPIEntity>> outputCollector){
-        String policyId = (String) input.get(0);
-        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-        DefaultDeduplicator dedup;
-        synchronized(alertDedups) {
-            dedup = alertDedups.get(policyId);
-        }
-
-        List<AlertAPIEntity> ret = Arrays.asList(alertEntity);
-        if (dedup == null) {
-            LOG.warn("Dedup config for policyId " + policyId + " is not set or 
is not a valid config");
-        } else {
-            if (dedup.getDedupIntervalMin() == -1) {
-                LOG.warn("the dedup interval is set as -1, which mean all 
alerts should be deduped(skipped)");
-                return;
-            }
-            ret = dedup.dedup(ret);
-        }
-        for (AlertAPIEntity entity : ret) {
-            outputCollector.collect(new Tuple2(policyId, entity));
-        }
-    }
-
-       public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> 
added) {
-               if(LOG.isDebugEnabled()) LOG.debug("Alert dedup config to be 
added : " + added);
-               for(AlertDefinitionAPIEntity alertDef : added.values()){
-                       LOG.info("Alert dedup config really added " + alertDef);
-                       DefaultDeduplicator dedup = createAlertDedup(alertDef);
-                       if (dedup != null) {
-                               synchronized(alertDedups) {             
-                                       
alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
-                               }
-                       }
-               }
-       }
-       
-       public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> 
changed) {
-               LOG.info("Alert dedup config changed : " + changed);
-               for(AlertDefinitionAPIEntity alertDef : changed.values()){
-                       LOG.info("Alert dedup config really changed " + 
alertDef);
-                       DefaultDeduplicator dedup = createAlertDedup(alertDef);
-                       if (dedup != null) {
-                               synchronized(alertDedups) {
-                                       
alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
-                               }
-                       }
-               }
-       }
-       
-       public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> 
deleted) {
-               LOG.info("alert dedup config deleted : " + deleted);
-               for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-                       LOG.info("alert dedup config deleted " + alertDef);
-                       // no cleanup to do, just remove it
-                       synchronized(alertDedups) {             
-                               
alertDedups.remove(alertDef.getTags().get(Constants.POLICY_ID));
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
deleted file mode 100644
index 8947d2c..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-
-import java.util.List;
-
-public class AlertEmailDeduplicationExecutor extends 
AlertDeduplicationExecutorBase {
-
-       private static final long serialVersionUID = 1L;
-
-       public AlertEmailDeduplicationExecutor(List<String> 
alertExecutorIdList, PolicyDefinitionDAO dao){
-               super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
deleted file mode 100644
index b30dbda..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.List;
-
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public class AlertEntityDeduplicationExecutor extends 
AlertDeduplicationExecutorBase {
-
-       private static final long serialVersionUID = 1L;
-
-       public AlertEntityDeduplicationExecutor(List<String> 
alertExecutorIdList, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
-               super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
deleted file mode 100644
index 1d79f9f..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/DefaultDeduplicator.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.common.metric.AlertContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DefaultDeduplicator implements EntityDeduplicator {
-       protected long dedupIntervalMin;
-       protected List<String> fields;
-       protected Map<EntityDedupKey, Long> entites = new HashMap<>();
-       public static Logger LOG = 
LoggerFactory.getLogger(DefaultDeduplicator.class);
-       
-       public static enum AlertDeduplicationStatus{
-               NEW,
-               DUPLICATED,
-               IGNORED
-       }
-       
-       public DefaultDeduplicator() {
-               this.dedupIntervalMin = 0;
-               fields = null;
-       }
-
-       public DefaultDeduplicator(long intervalMin, List<String> fields) {
-               this.dedupIntervalMin = intervalMin;
-               this.fields = fields;
-       }
-       
-       public void clearOldCache() {
-               List<EntityDedupKey> removedkeys = new ArrayList<>();
-               for (Entry<EntityDedupKey, Long> entry : entites.entrySet()) {
-                       EntityDedupKey entity = entry.getKey();
-                       if (System.currentTimeMillis() - 7 * 
DateUtils.MILLIS_PER_DAY > entity.createdTime) {
-                               removedkeys.add(entry.getKey());
-                       }
-               }
-               for (EntityDedupKey alertKey : removedkeys) {
-                       entites.remove(alertKey);
-               }
-       }
-       
-       public AlertDeduplicationStatus checkDedup(EntityDedupKey key){
-               long current = key.timestamp;
-               if(!entites.containsKey(key)){
-                       entites.put(key, current);
-                       return AlertDeduplicationStatus.NEW;
-               }
-               
-               long last = entites.get(key);
-               if(current - last >= dedupIntervalMin * 
DateUtils.MILLIS_PER_MINUTE){
-                       entites.put(key, current);
-                       return AlertDeduplicationStatus.DUPLICATED;
-               }
-               
-               return AlertDeduplicationStatus.IGNORED;
-       }
-
-       private List<String> getKeyList(AlertAPIEntity entity) {
-               List<String> keys = new ArrayList<>(entity.getTags().values());
-               if(fields != null && !fields.isEmpty()) {
-                       for (String field: fields) {
-                               AlertContext context = 
entity.getWrappedAlertContext();
-                               keys.add(context.getProperty(field));
-                       }
-               }
-               return keys;
-       }
-
-       public List<AlertAPIEntity> dedup(List<AlertAPIEntity> list) {
-               clearOldCache();
-               List<AlertAPIEntity> dedupList = new ArrayList<>();
-        int totalCount = list.size();
-        int dedupedCount = 0;
-               for(AlertAPIEntity entity: list) {
-                       if (entity.getTags() == null) {
-                               if(LOG.isDebugEnabled()) LOG.debug("Tags is 
null, don't know how to deduplicate, do nothing");
-                       } else {
-                AlertDeduplicationStatus status = checkDedup(new 
EntityDedupKey(getKeyList(entity), entity.getTimestamp()));
-                if (!status.equals(AlertDeduplicationStatus.IGNORED)) {
-                    dedupList.add(entity);
-                } else {
-                    dedupedCount++;
-                    if (LOG.isDebugEnabled())
-                        LOG.debug(String.format("Entity is skipped because 
it's duplicated: " + entity.toString()));
-                }
-            }
-               }
-
-        if(dedupedCount>0){
-            LOG.info(String.format("Skipped %s of %s alerts because they are 
duplicated", dedupedCount, totalCount));
-        }else if(LOG.isDebugEnabled()){
-            LOG.debug(String.format("Skipped %s of %s duplicated 
alerts",dedupedCount,totalCount));
-        }
-
-               return dedupList;
-       }
-
-       public EntityDeduplicator setDedupIntervalMin(long dedupIntervalMin) {
-               this.dedupIntervalMin = dedupIntervalMin;
-               return this;
-       }
-       
-       public long getDedupIntervalMin() {
-               return dedupIntervalMin;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
deleted file mode 100644
index 36b83e1..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDedupKey.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.dedup;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-public class EntityDedupKey {
-    public List<String> values;
-    public Long timestamp;     // entity's timestamp
-    public long createdTime; // entityTagsUniq's created time, for cache 
removal;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(EntityDedupKey.class);
-
-    public EntityDedupKey(List<String> values, long timestamp) {
-        this.values = new ArrayList<>(values);
-        this.timestamp = timestamp;
-        this.createdTime = System.currentTimeMillis();
-    }
-
-    public boolean equals(Object obj) {
-        if (obj instanceof EntityDedupKey) {
-            EntityDedupKey key = (EntityDedupKey) obj;
-            if (key == null || key.values.size() != values.size()) {
-                return false;
-            }
-            return values.equals(key.values);
-        }
-        return false;
-    }
-
-    public int hashCode() {
-        HashCodeBuilder builder = new HashCodeBuilder();
-        for (String value : values) {
-            builder.append(value);
-        }
-        return builder.build();
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
deleted file mode 100644
index 85dd19a..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityDeduplicator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.List;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-/**
- * Dedup Eagle entities.
- */
-public interface EntityDeduplicator {
-       
-       EntityDeduplicator setDedupIntervalMin(long intervalMin);
-       
-       long getDedupIntervalMin();
-       
-       List dedup(List<AlertAPIEntity> list);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
deleted file mode 100644
index 81c8ba6..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/EntityTagsUniq.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package org.apache.eagle.alert.dedup;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Mar 19, 2015
- */
-public class EntityTagsUniq {
-       public Map<String, String> tags;
-       public Long timestamp;   // entity's timestamp
-       public long createdTime; // entityTagsUniq's created time, for cache 
removal;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(EntityTagsUniq.class);
-       
-       public EntityTagsUniq(Map<String, String> tags, long timestamp) {
-               this.tags = new HashMap<String, String>(tags);
-               this.timestamp = timestamp;
-               this.createdTime = System.currentTimeMillis();
-       }
-       
-       @Override       
-       public boolean equals(Object obj) {             
-               if (obj instanceof EntityTagsUniq) {
-                       EntityTagsUniq au = (EntityTagsUniq) obj;
-                       if (tags.size() != au.tags.size()) return false;
-                       for (Entry<String, String> keyValue : 
au.tags.entrySet()) {
-                               boolean keyExist = 
tags.containsKey(keyValue.getKey());
-                               // sanity check
-                               if (tags.get(keyValue.getKey()) == null || 
keyValue.getValue() == null) {
-                                       return true;
-                               }
-                               if ( !keyExist || 
!tags.get(keyValue.getKey()).equals(keyValue.getValue())) {                     
      
-                                       return false;
-                               }
-                       }
-                       return true; 
-               }
-               return false;
-       }
-       
-       @Override
-       public int hashCode() { 
-               int hashCode = 0;
-               for (Map.Entry<String,String> entry : tags.entrySet()) {
-            if(entry.getValue() == null) {
-                LOG.warn("Tag value for key ["+entry.getKey()+"] is null, 
skipped for hash code");
-            }else {
-                try {
-                    hashCode ^= entry.getValue().hashCode();
-                } catch (Throwable t) {
-                    LOG.info("Got exception because of entry: " + entry, t);
-                }
-            }
-               }
-               return hashCode;
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
deleted file mode 100644
index 2eee6c5..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.apache.eagle.policy.executor.PolicyProcessExecutor;
-import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
-
-public class AlertExecutor extends 
PolicyProcessExecutor<AlertDefinitionAPIEntity, AlertAPIEntity> {
-
-       private final SiddhiAlertAPIEntityRender resultRender = new 
SiddhiAlertAPIEntityRender();
-
-       public AlertExecutor(String alertExecutorId, PolicyPartitioner 
partitioner, int numPartitions, int partitionSeq,
-                       PolicyDefinitionDAO<AlertDefinitionAPIEntity> 
alertDefinitionDao, String[] sourceStreams) {
-               super(alertExecutorId, partitioner, numPartitions, 
partitionSeq, alertDefinitionDao, sourceStreams,
-                               AlertDefinitionAPIEntity.class);
-       }
-
-       @Override
-       public ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity> 
getResultRender() {
-               return resultRender;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
deleted file mode 100644
index 8ab290e..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.PolicyPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValue;
-
-/**
- * Create alert executors and provide callback for programmer to link alert 
executor to immediate parent executors
- *
- * <br/><br/>
- * Explanations for programId, alertExecutorId and policy<br/><br/>
- * - programId - distributed or single-process program for example one storm 
topology<br/>
- * - alertExecutorId - one process/thread which executes multiple policies<br/>
- * - policy - some rules to be evaluated<br/>
- *
- * <br/>
- *
- * Normally the mapping is like following:
- * <pre>
- * programId (1:N) alertExecutorId
- * alertExecutorId (1:N) policy
- * </pre>
- */
-public class AlertExecutorCreationUtils {
-       private final static Logger LOG = 
LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
-
-
-    /**
-     * Build DAG Tasks based on persisted alert definition and schemas from 
eagle store.
-     *
-     * <h3>Require configuration:</h3>
-     *
-     * <ul>
-     * <li>eagleProps.site: program site id.</li>
-     * <li>eagleProps.dataSource: program data source.</li>
-     * <li>alertExecutorConfigs: only configured executor will be built into 
execution tasks.</li>
-     * </ul>
-     *
-     * <h3>Steps:</h3>
-     *
-     * <ol>
-     * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
-     * <li>(dataSource) => 
Map[alertExecutorId:String,streamName:List[String]]</li>
-     * <li>(site,dataSource) => 
Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
-     * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, 
partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, 
sourceStreams)[]</li>
-     * </ol>
-     */
-       public static AlertExecutor[] createAlertExecutors(Config config, 
PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefDAO,
-                       List<String> streamNames, String alertExecutorId) 
throws Exception{
-               // Read `alertExecutorConfigs` from configuration and get 
config for this alertExecutorId
-        int numPartitions =1;
-        String partitionerCls = 
DefaultPolicyPartitioner.class.getCanonicalName();
-        String alertExecutorConfigsKey = "alertExecutorConfigs";
-        if(config.hasPath(alertExecutorConfigsKey)) {
-            Map<String, ConfigValue> alertExecutorConfigs = 
config.getObject(alertExecutorConfigsKey);
-            if(alertExecutorConfigs !=null && 
alertExecutorConfigs.containsKey(alertExecutorId)) {
-                Map<String, Object> alertExecutorConfig = (Map<String, 
Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
-                int parts = 0;
-                if(alertExecutorConfig.containsKey("parallelism")) parts = 
(int) (alertExecutorConfig.get("parallelism"));
-                numPartitions = parts == 0 ? 1 : parts;
-                if(alertExecutorConfig.containsKey("partitioner")) 
partitionerCls = (String) alertExecutorConfig.get("partitioner");
-            }
-        }
-
-        return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, 
numPartitions, partitionerCls);
-       }
-
-    /**
-     * Build alert executors and assign alert definitions between these 
executors by partitioner 
(alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
-     */
-       public static AlertExecutor[] createAlertExecutors(PolicyDefinitionDAO 
alertDefDAO, List<String> sourceStreams,
-                                                          String 
alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
-               LOG.info("Creating alert executors with alertExecutorID: " + 
alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: 
"+ partitionerCls);
-
-        // TODO: Create sourceStreams with alertExecutorID into 
AlertExecutorService
-
-               PolicyPartitioner partitioner = 
(PolicyPartitioner)Class.forName(partitionerCls).newInstance();
-               AlertExecutor[] alertExecutors = new 
AlertExecutor[numPartitions];
-        String[] _sourceStreams = sourceStreams.toArray(new String[0]);
-
-               for(int i = 0; i < numPartitions; i++){
-                       alertExecutors[i] = new AlertExecutor(alertExecutorID, 
partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
-               }       
-               return alertExecutors;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index af42dd3..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.common.AlertEmailSender;
-import org.apache.eagle.alert.email.AlertEmailComponent;
-import org.apache.eagle.alert.email.AlertEmailContext;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class AlertEmailGenerator{
-       private String tplFile;
-       private String sender;
-       private String recipients;
-       private String subject;
-       private ConfigObject eagleProps;
-
-    private ThreadPoolExecutor executorPool;
-
-    private final static Logger LOG = 
LoggerFactory.getLogger(AlertEmailGenerator.class);
-
-    private final static long MAX_TIMEOUT_MS =60000;
-
-    public void sendAlertEmail(AlertAPIEntity entity) {
-               sendAlertEmail(entity, recipients, null);
-       }
-       
-       public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
-               sendAlertEmail(entity, recipients, null);       
-       }
-       
-       public void sendAlertEmail(AlertAPIEntity entity, String recipients, 
String cc) {
-               AlertEmailContext email = new AlertEmailContext();
-               
-               AlertEmailComponent component = new AlertEmailComponent();
-               
component.setAlertContext(AlertContext.fromJsonString(entity.getAlertContext()));
-               List<AlertEmailComponent> components = new 
ArrayList<AlertEmailComponent>();
-               components.add(component);              
-               email.setComponents(components);
-               if 
(AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT)
 != null) {
-                       
email.setSubject(AlertContext.fromJsonString(entity.getAlertContext()).getProperty(Constants.SUBJECT));
-               }
-               else email.setSubject(subject);
-               email.setVelocityTplFile(tplFile);
-               email.setRecipients(recipients);
-               email.setCc(cc);
-               email.setSender(sender);
-               
-               /** asynchronized email sending */
-               @SuppressWarnings("rawtypes")
-        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
-
-        if(this.executorPool == null) throw new 
IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
-        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
-        Future future = this.executorPool.submit(thread);
-        try {
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            LOG.info(String.format("Successfully send email to %s", 
recipients));
-        } catch (InterruptedException | ExecutionException  e) {
-            LOG.error(String.format("Failed to send email to %s, due 
to:%s",recipients,e),e);
-        } catch (TimeoutException e) {
-            LOG.error(String.format("Failed to send email to %s due to timeout 
exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
-        }
-    }
-       
-       public String getTplFile() {
-               return tplFile;
-       }
-       
-       public void setTplFile(String tplFile) {
-               this.tplFile = tplFile;
-       }
-
-       public String getSender() {
-               return sender;
-       }
-
-       public void setSender(String sender) {
-               this.sender = sender;
-       }
-
-       public String getRecipients() {
-               return recipients;
-       }
-
-       public void setRecipients(String recipients) {
-               this.recipients = recipients;
-       }
-
-       public String getSubject() {
-               return subject;
-       }
-
-       public void setSubject(String subject) {
-               this.subject = subject;
-       }
-
-       public ConfigObject getEagleProps() {
-               return eagleProps;
-       }
-
-       public void setEagleProps(ConfigObject eagleProps) {
-               this.eagleProps = eagleProps;
-       }
-
-    public void setExecutorPool(ThreadPoolExecutor executorPool) {
-        this.executorPool = executorPool;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
deleted file mode 100644
index b024c39..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import scala.Tuple1;
-
-/**
- * notify alert by email, kafka message, storage or other means
- */
-public class AlertNotificationExecutor extends 
JavaStormStreamExecutor1<String> implements 
PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
-       private static final long serialVersionUID = 1690354365435407034L;
-       private static final Logger LOG = 
LoggerFactory.getLogger(AlertNotificationExecutor.class);
-       private Config config;
-       /** Notification Manager - Responsible for forward and invoke 
configured Notification Plugin **/
-       private NotificationPluginManagerImpl notificationManager;
-
-       private List<String> alertExecutorIdList;
-       private PolicyDefinitionDAO dao;
-
-
-    public AlertNotificationExecutor(List<String> alertExecutorIdList, 
PolicyDefinitionDAO dao){
-               this.alertExecutorIdList = alertExecutorIdList;
-               this.dao = dao;
-       }
-
-       @Override
-       public void init() {
-               String site = config.getString("eagleProps.site");
-               String application = config.getString("eagleProps.application");
-               Map<String, Map<String, AlertDefinitionAPIEntity>> 
initialAlertDefs;
-               try {
-                       initialAlertDefs = 
dao.findActivePoliciesGroupbyExecutorId( site, application );
-               }
-               catch (Exception ex) {
-                       LOG.error("fail to initialize initialAlertDefs: ", ex);
-                       throw new IllegalStateException("fail to initialize 
initialAlertDefs: ", ex);
-               }
-
-               if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-                       LOG.warn("No alert definitions found for site: 
"+site+", application: "+ application);
-               }
-               try{
-                       notificationManager = new 
NotificationPluginManagerImpl(config);
-               }catch (Exception ex ){
-                       LOG.error("Fail to initialize NotificationManager: ", 
ex);
-                       throw new IllegalStateException("Fail to initialize 
NotificationManager: ", ex);
-               }
-
-               DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = 
DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
-               policyLoader.init(initialAlertDefs, dao, config);
-               for (String alertExecutorId : alertExecutorIdList) {
-                       policyLoader.addPolicyChangeListener(alertExecutorId, 
this);
-               }
-       }
-
-       @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-
-       @Override
-       public void flatMap(java.util.List<Object> input, 
Collector<Tuple1<String>> outputCollector){
-               AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-               processAlerts(Arrays.asList(alertEntity));
-       }
-
-       private void processAlerts(List<AlertAPIEntity> list) {
-               for (AlertAPIEntity entity : list) {
-                       notificationManager.notifyAlert(entity);
-               }
-       }
-
-       @Override
-       public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> 
added) {
-               if(LOG.isDebugEnabled()) LOG.debug(" alert notification config 
changed : " + added);
-               for(AlertDefinitionAPIEntity alertDef : added.values()){
-                       LOG.info("alert notification config really changed " + 
alertDef);
-                       notificationManager.updateNotificationPlugins( alertDef 
, false );
-               }
-       }
-
-       @Override
-       public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> 
changed) {
-               if(LOG.isDebugEnabled()) LOG.debug("alert notification config 
to be added : " + changed);
-               for(AlertDefinitionAPIEntity alertDef : changed.values()){
-                       LOG.info("alert notification config really added " + 
alertDef);
-                       notificationManager.updateNotificationPlugins( alertDef 
, false );
-               }
-       }
-
-       @Override
-       public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> 
deleted) {
-               if(LOG.isDebugEnabled()) LOG.debug("alert notification config 
to be deleted : " + deleted);
-               for(AlertDefinitionAPIEntity alertDef : deleted.values()){
-                       LOG.info("alert notification config really deleted " + 
alertDef);
-                       notificationManager.updateNotificationPlugins( alertDef 
, true );
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
deleted file mode 100644
index 61bb7dc..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/AlertPersistExecutor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.persist;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import scala.Tuple1;
-
-import java.util.Arrays;
-
-public class AlertPersistExecutor extends JavaStormStreamExecutor1<String> {
-
-       private static final long serialVersionUID = 1L;
-       private Config config;
-       private EaglePersist persist;
-
-       public AlertPersistExecutor(){
-       }
-    @Override
-       public void prepareConfig(Config config) {
-               this.config = config;           
-       }
-
-    @Override
-       public void init() {
-               String host = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-               int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-               String username = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME)
-                               ? 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : 
null;
-               String password = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)
-                               ? 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : 
null;
-               this.persist = new EaglePersist(host, port, username, password);
-       }
-
-    @Override
-    public void flatMap(java.util.List<Object> input, 
Collector<Tuple1<String>> outputCollector){
-        persist.doPersist(Arrays.asList((AlertAPIEntity)(input.get(1))));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
deleted file mode 100644
index ebba518..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/persist/EaglePersist.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package org.apache.eagle.alert.persist;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public class EaglePersist {
-               
-       private static Logger LOG = LoggerFactory.getLogger(EaglePersist.class);
-       private String eagleServiceHost;
-       private int eagleServicePort;
-       private String username;
-       private String password;
-
-       public EaglePersist(String eagleServiceHost, int eagleServicePort) {
-               this(eagleServiceHost, eagleServicePort, null, null);
-       }
-
-       public EaglePersist(String eagleServiceHost, int eagleServicePort, 
String username, String password) {
-               this.eagleServiceHost = eagleServiceHost;
-               this.eagleServicePort = eagleServicePort;
-               this.username = username;
-               this.password = password;
-       }
-       
-       public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
-               if (list.isEmpty()) return false;
-               LOG.info("Going to persist entities, type: " + " " + 
list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
-               try {
-                       IEagleServiceClient client = new 
EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
-                       GenericServiceAPIResponseEntity<String> response = 
client.create(list);
-                       client.close();
-                       if (response.isSuccess()) {
-                               LOG.info("Successfully create entities " + 
list.toString());
-                               return true;
-                       }
-                       else {
-                               LOG.error("Fail to create entities");
-                               return false;
-                       }
-               }
-               catch (Exception ex) {
-                       LOG.error("Got an exception in persisting entities" + 
ex.getMessage(), ex);
-                       return false;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
deleted file mode 100644
index c7ff74c..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.common.metric.AlertContext;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.ResultRender;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.common.UrlBuilder;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.apache.eagle.policy.siddhi.SiddhiQueryCallbackImpl;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-public class SiddhiAlertAPIEntityRender implements 
ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity>, Serializable {
-
-       public static final Logger LOG = 
LoggerFactory.getLogger(SiddhiAlertAPIEntityRender.class);
-       public static final String source = 
ManagementFactory.getRuntimeMXBean().getName();
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public AlertAPIEntity render(Config config, List<Object> results, 
PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> 
siddhiAlertContext, long timestamp) {
-               List<String> rets = 
SiddhiQueryCallbackImpl.convertToString(results);
-               SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> 
evaluator = (SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity>) 
siddhiAlertContext.evaluator;
-               String alertExecutorId = 
siddhiAlertContext.alertExecutor.getExecutorId();
-               AlertAPIEntity entity = new AlertAPIEntity();
-               AlertContext context = new AlertContext();
-               String sourceStreams = 
evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS);
-               String[] sourceStreamsArr = sourceStreams.split(",");           
-               List<String> attrRenameList = 
evaluator.getOutputStreamAttrNameList();          
-               Map<String, String> tags = new HashMap<String, String>();
-               for (String sourceStream : sourceStreamsArr) {
-                        List<AlertStreamSchemaEntity> list = 
StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim());
-                        for (AlertStreamSchemaEntity alertStream : list) {
-                                if (alertStream.getUsedAsTag() != null && 
alertStream.getUsedAsTag() == true) {
-                                        String attrName = 
alertStream.getTags().get(Constants.ATTR_NAME);
-                                        tags.put(attrName, 
rets.get(attrRenameList.indexOf(attrName)));
-                                }                               
-                        }                       
-               }
-
-               for (int index = 0; index < rets.size(); index++) {
-                       //attrRenameList.get(0) -> "eagleAlertContext". We need 
to skip "eagleAlertContext", index is from 1 for attRenameList.
-                       context.addProperty(attrRenameList.get(index), 
rets.get(index));
-               }
-
-               StringBuilder sb = new StringBuilder();
-               for (Entry<String, String> entry : 
context.getProperties().entrySet()) {
-                       String key = entry.getKey();
-                       String value = entry.getValue();
-                       sb.append(key + "=\"" + value + "\" ");                 
-               }
-               context.addAll(evaluator.getAdditionalContext());
-               String policyId = context.getProperty(Constants.POLICY_ID);
-               String alertMessage = "The Policy \"" + policyId + "\" has been 
detected with the below information: " + sb.toString() ;
-               String site = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.SITE);
-               String application = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.APPLICATION);
-               String host = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-               Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + 
"." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-               context.addProperty(Constants.ALERT_EVENT, sb.toString());
-               context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
-               context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
-               context.addProperty(EagleConfigConstants.APPLICATION, 
application);
-               context.addProperty(EagleConfigConstants.SITE, site);
-               entity.setTimestamp(timestamp);
-               /** If we need to add severity tag, we should add severity 
filed in AbstractpolicyDefinition, and pass it down **/
-               tags.put(EagleConfigConstants.SITE, site);
-               tags.put(EagleConfigConstants.APPLICATION, application);
-               tags.put(Constants.SOURCE_STREAMS, 
context.getProperty(Constants.SOURCE_STREAMS));
-               tags.put(Constants.POLICY_ID, 
context.getProperty(Constants.POLICY_ID));
-               tags.put(Constants.ALERT_SOURCE, source);
-               tags.put(Constants.ALERT_EXECUTOR_ID, alertExecutorId);
-               entity.setTags(tags);
-
-               context.addProperty(Constants.POLICY_DETAIL_URL, 
UrlBuilder.buiildPolicyDetailUrl(host, port, tags));
-               context.addProperty(Constants.ALERT_DETAIL_URL, 
UrlBuilder.buildAlertDetailUrl(host, port, entity));
-               entity.setAlertContext(context.toJsonString());
-               return entity;
-       }       
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index eac2bfd..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
deleted file mode 100644
index d4d3795..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/main/resources/table-create.script
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-##### create alert related tables
-create 'eagle_metric', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', 
COMPRESSION => 'SNAPPY'}
-create 'alertdetail', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', 
COMPRESSION => 'SNAPPY'}
-
-create 'alertDataSource', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', 
COMPRESSION => 'SNAPPY'}
-create 'alertStream', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', 
COMPRESSION => 'SNAPPY'}
-create 'alertExecutor', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', 
COMPRESSION => 'SNAPPY'}
-create 'alertStreamSchema', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 
'ROW', COMPRESSION => 'SNAPPY'}
-create 'alertdef', {NAME => 'f', VERSIONS => '3', BLOOMFILTER => 'ROW', 
COMPRESSION => 'SNAPPY'}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
deleted file mode 100644
index edfb15f..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-Manifest-Version: 1.0
-Class-Path: 
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
deleted file mode 100644
index 20afc95..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.cep;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.*;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import scala.Tuple2;
-
-import java.util.*;
-import java.util.concurrent.Semaphore;
-
-public class TestSiddhiEvaluator {
-       private AlertStreamSchemaEntity createStreamMetaEntity(String attrName, 
String type) {
-               AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-               Map<String, String> tags = new HashMap<String, String>();
-               tags.put("application", "hdfsAuditLog");
-               tags.put("streamName", "hdfsAuditLogEventStream");
-               tags.put("attrName", attrName);
-               entity.setTags(tags);
-               entity.setAttrType(type);
-               return entity;
-       }
-
-       private int alertCount;
-       private Semaphore semaphore;
-       @Before
-       public void before(){
-               alertCount = 0;
-               semaphore = new Semaphore(0);
-       }
-
-       @Test
-       public void test() throws Exception{
-               Config config = ConfigFactory.load("unittest.conf");
-               AlertStreamSchemaDAO streamDao = new 
AlertStreamSchemaDAOImpl(null, null) {
-                       @Override
-                       public List<AlertStreamSchemaEntity> 
findAlertStreamSchemaByApplication(String dataSource) throws Exception {
-                               List<AlertStreamSchemaEntity> list = new 
ArrayList<AlertStreamSchemaEntity>();
-                               list.add(createStreamMetaEntity("cmd", 
"string"));
-                               list.add(createStreamMetaEntity("dst", 
"string"));
-                               list.add(createStreamMetaEntity("src", 
"string"));
-                               list.add(createStreamMetaEntity("host", 
"string"));
-                               list.add(createStreamMetaEntity("user", 
"string"));
-                               list.add(createStreamMetaEntity("timestamp", 
"long"));
-                               list.add(createStreamMetaEntity("securityZone", 
"string"));
-                               
list.add(createStreamMetaEntity("sensitivityType", "string"));
-                               list.add(createStreamMetaEntity("allowed", 
"string"));
-                               return list;
-                       }
-               };
-        StreamMetadataManager.getInstance().reset();
-        StreamMetadataManager.getInstance().init(config, streamDao);
-
-               Map<String, Object> data1 =  new TreeMap<String, Object>(){{
-                       put("cmd", "open");
-                       put("dst", "");
-                       put("src", "");
-                       put("host", "");
-                       put("user", "");
-                       put("timestamp", 
String.valueOf(System.currentTimeMillis()));
-                       put("securityZone", "");
-                       put("sensitivityType", "");
-                       put("allowed", "true");
-               }};
-        final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
-        policyDef.setType("siddhiCEPEngine");
-        String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
-                                                       "select * " +
-                                                       "insert into 
outputStream ;";
-        policyDef.setExpression(expression);
-
-               PolicyDefinitionDAO alertDao = new 
PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, null),
-                               
Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
-                       @Override
-                       public Map<String, Map<String, 
AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, 
String dataSource) throws Exception {
-                               return null;
-                       }
-
-            @Override
-            public void updatePolicyDetails(AbstractPolicyDefinitionEntity 
entity) { /* do nothing */ }
-        };
-
-               AlertExecutor alertExecutor = new 
AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new 
String[]{"hdfsAuditLogEventStream"}) {
-                       @Override
-                       protected Map<String, String> getDimensions(String 
policyId) {
-                               return new HashMap<>();
-                       }
-               };
-               alertExecutor.prepareConfig(config);
-               alertExecutor.init();
-
-               PolicyEvaluationContext<AlertDefinitionAPIEntity, 
AlertAPIEntity> context = new PolicyEvaluationContext<>();
-               context.alertExecutor = alertExecutor;
-               context.policyId = "testPolicy";
-               context.resultRender = new SiddhiAlertAPIEntityRender();
-               context.outputCollector = (Collector<Tuple2<String, 
AlertAPIEntity>>) (stringAlertAPIEntityTuple2) -> {
-                       alertCount++;
-                       semaphore.release();
-               };
-               SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> 
evaluator =
-                               new SiddhiPolicyEvaluator<>(config, context, 
policyDef, new String[]{"hdfsAuditLogEventStream"}, false);
-               evaluator.evaluate(new ValuesArray(context.outputCollector, 
"hdfsAuditLogEventStream", data1));
-               Thread.sleep(2 * 1000);
-               semaphore.acquire();
-               Assert.assertEquals(alertCount, 1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
deleted file mode 100644
index f6d6a63..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.config;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-
-public class TestAlertDedup {
-
-       @Test
-       public void test() throws Exception{
-               String alertDef = 
"{\"alertDedupIntervalMin\":\"10\",\"fields\":[\"key1\",\"key2\",\"key3\"]}";
-               DeduplicatorConfig dedupConfig = 
JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
-               Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 10);
-               Assert.assertEquals(dedupConfig.getFields().size(), 3);
-               
-               alertDef = "null";
-               dedupConfig = JsonSerDeserUtils.deserialize(alertDef, 
DeduplicatorConfig.class);
-               Assert.assertEquals(dedupConfig, null);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
deleted file mode 100644
index f7dcdde..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestAlertDefinitionDAOImpl {
-
-       public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, 
String programId, String alertExecutorId, String policyId, String policyType) {
-               AlertDefinitionAPIEntity entity = new 
AlertDefinitionAPIEntity();
-               entity.setEnabled(true);
-               Map<String, String> tags = new HashMap<String, String>();
-               tags.put("site", site);
-               tags.put("programId", programId);
-               tags.put("alertExecutorId", alertExecutorId);
-               tags.put("policyId", policyId);
-               tags.put("policyType", policyType);
-               entity.setTags(tags);
-               return entity;
-       }
-       
-       @Test
-       public void test() throws Exception{
-               Config config = ConfigFactory.load();
-               String eagleServiceHost = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-               int eagleServicePort = 
config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-               String site = "sandbox";
-               String dataSource = "UnitTest";
-               PolicyDefinitionDAO dao = new PolicyDefinitionEntityDAOImpl(new 
EagleServiceConnector(eagleServiceHost, eagleServicePort),
-                               
Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
-                       @Override
-                       public List<AlertDefinitionAPIEntity> 
findActivePolicies(String site, String dataSource) throws Exception {
-                               List<AlertDefinitionAPIEntity> list = new 
ArrayList<AlertDefinitionAPIEntity>();
-                               list.add(buildTestAlertDefEntity(site, 
dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
-                               list.add(buildTestAlertDefEntity(site, 
dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
-                               list.add(buildTestAlertDefEntity(site, 
dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
-                               list.add(buildTestAlertDefEntity(site, 
dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
-                               return list;
-                       }
-
-            @Override
-                       public void 
updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
-               };
-
-               Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = 
dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
-               
-               Assert.assertEquals(2, retMap.size());
-               Assert.assertEquals(2, retMap.get("TestExecutor1").size());
-               Assert.assertEquals(2, retMap.get("TestExecutor2").size());
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
deleted file mode 100644
index d703214..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-public class TestSiddhiStreamMetadataUtils {
-       @Test
-       public void test() throws Exception {
-        Config config = ConfigFactory.load();
-        StreamMetadataManager.getInstance().reset();
-        StreamMetadataManager.getInstance().init(config, new 
AlertStreamSchemaDAO() {
-            @Override
-            public List<AlertStreamSchemaEntity> 
findAlertStreamSchemaByApplication(
-                    String application) {
-                return 
Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
-                        generateStreamMetadataAPIEntity("attrName2", "LONG")
-                );
-            }
-        });
-               String siddhiStreamDef = 
SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
-               Assert.assertEquals("define stream " + "testStreamName" + 
"(attrName1 string,attrName2 long);", siddhiStreamDef);
-       }
-       
-       private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final 
String attrName, String attrType){
-               AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-               entity.setTags(new HashMap<String, String>(){{
-                       put("programId", "testProgramId");
-                       put("streamName", "testStreamName");
-                       put("attrName", attrName);
-               }});
-               entity.setAttrType(attrType);
-               return entity;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
deleted file mode 100644
index 0bbfc4a..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestStreamDefinitionDAOImpl {
-       
-       public AlertStreamSchemaEntity buildTestStreamDefEntity(String 
programId, String streamName, String attrName) {
-               AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-               entity.setAttrType("String");
-               entity.setAttrValueResolver("DefaultAttrValueResolver");
-               entity.setCategory("SimpleType");
-               Map<String, String> tags = new HashMap<String, String>();
-               tags.put("programId", programId);
-               tags.put("streamName", streamName);
-               tags.put("attrName", attrName);
-               entity.setTags(tags);
-               return entity;
-       }
-       
-       @Test
-       public void test() throws Exception{
-        Config config = ConfigFactory.load();
-               AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, 
null) {
-                       public List<AlertStreamSchemaEntity> 
findAlertStreamSchemaByApplication(String application) throws Exception {
-                               List<AlertStreamSchemaEntity> list = new 
ArrayList<AlertStreamSchemaEntity>();
-                               String programId = "UnitTest";
-                               list.add(buildTestStreamDefEntity(programId, 
"TestStream", "Attr1"));
-                               list.add(buildTestStreamDefEntity(programId, 
"TestStream", "Attr2"));
-                               list.add(buildTestStreamDefEntity(programId, 
"TestStream", "Attr3"));
-                               list.add(buildTestStreamDefEntity(programId, 
"TestStream", "Attr4"));
-                               return list;
-                       }
-               };
-        StreamMetadataManager.getInstance().reset();
-               StreamMetadataManager.getInstance().init(config, dao);
-               Map<String, List<AlertStreamSchemaEntity>> retMap = 
StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
-               Assert.assertTrue(retMap.get("TestStream").size() == 4);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
deleted file mode 100644
index bb94908..0000000
--- 
a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/executor/TestPolicyExecutor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.executor;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.siddhi.SiddhiPolicyDefinition;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.junit.Assert;
-
-/**
- * @since Dec 18, 2015
- *
- */
-public class TestPolicyExecutor {
-
-       public static class T2 extends AbstractPolicyDefinitionEntity {
-               @Override
-               public String getPolicyDef() {
-                       return null;
-               }
-               @Override
-               public boolean isEnabled() {
-                       return false;
-               }
-       }
-
-       // not feasible to Unit test, it requires the local service.
-       @Ignore
-       @Test
-       public void testReflectCreatePolicyEvaluator() throws Exception {
-               System.setProperty("config.resource", "/unittest.conf");
-               String policyType = Constants.policyType.siddhiCEPEngine.name();
-               Class<? extends PolicyEvaluator> evalCls = 
PolicyManager.getInstance().getPolicyEvaluator(policyType);
-               Config config = ConfigFactory.load();
-
-               String def = "{\"expression\":\"from hdfsAuditLogEventStream 
select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}";
-               // test1 : test json deserialization
-               AbstractPolicyDefinition policyDef = null;
-               policyDef = JsonSerDeserUtils.deserialize(def, 
AbstractPolicyDefinition.class,
-                               
PolicyManager.getInstance().getPolicyModules(policyType));
-               // Assert conversion succeed
-               Assert.assertEquals(SiddhiPolicyDefinition.class, 
policyDef.getClass());
-
-               // make sure meta data manager initialized
-               StreamMetadataManager.getInstance().init(config, new 
AlertStreamSchemaDAOImpl(config));
-
-               String[] sourceStreams = new String[] { 
"hdfsAuditLogEventStream" };
-               // test2 : test evaluator
-               PolicyEvaluator pe = evalCls.getConstructor(Config.class, 
String.class, AbstractPolicyDefinition.class,
-                               String[].class, 
boolean.class).newInstance(config, "policy-id", policyDef, sourceStreams, 
false);
-
-               PolicyEvaluator<AlertDefinitionAPIEntity> e1 = 
(PolicyEvaluator<AlertDefinitionAPIEntity>) pe;
-
-               PolicyEvaluator<T2> e2 = (PolicyEvaluator<T2>) pe;
-
-       }
-
-}


Reply via email to