http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
deleted file mode 100755
index 3aa079e..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorThreadSafeWrapper.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.task.OutputCollector;
-
-/**
- * <h2>Thread Safe Mechanism</h2>
- * <ul>
- * <li>
- *     emit() method is thread-safe enough to be called anywhere 
asynchronously in multi-thread
- * </li>
- * <li>
- *     flush() method must be called synchronously, because Storm 
OutputCollector is not thread-safe
- * </li>
- * </ul>
- */
-public class AlertBoltOutputCollectorThreadSafeWrapper implements 
AlertStreamCollector {
-    private final OutputCollector delegate;
-    private final LinkedBlockingQueue<AlertStreamEvent> queue;
-    private final static Logger LOG = 
LoggerFactory.getLogger(AlertBoltOutputCollectorThreadSafeWrapper.class);
-    private final AtomicLong lastFlushTime = new 
AtomicLong(System.currentTimeMillis());
-    private final AutoAlertFlusher flusher;
-    private final static int MAX_ALERT_DELAY_SECS = 10;
-
-    public AlertBoltOutputCollectorThreadSafeWrapper(OutputCollector 
outputCollector){
-        this.delegate = outputCollector;
-        this.queue = new LinkedBlockingQueue<>();
-        this.flusher = new AutoAlertFlusher(this);
-        this.flusher.setName(Thread.currentThread().getName()+"-alertFlusher");
-        this.flusher.start();
-    }
-
-    private static class AutoAlertFlusher extends Thread{
-        private final AlertBoltOutputCollectorThreadSafeWrapper collector;
-        private boolean stopped = false;
-        private final static Logger LOG = 
LoggerFactory.getLogger(AutoAlertFlusher.class);
-
-        private AutoAlertFlusher(AlertBoltOutputCollectorThreadSafeWrapper 
collector){
-            this.collector = collector;
-        }
-
-        @Override
-        public void run() {
-            LOG.info("Starting");
-            while(!this.stopped){
-                if(System.currentTimeMillis() - collector.lastFlushTime.get() 
>= MAX_ALERT_DELAY_SECS * 1000L){
-                    this.collector.flush();
-                }
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException ignored) {}
-            }
-            LOG.info("Stopped");
-        }
-        public void shutdown(){
-            LOG.info("Stopping");
-            this.stopped = true;
-        }
-    }
-
-    /**
-     * Emit method can be called in multi-thread
-     * @param event
-     */
-    @Override
-    public void emit(AlertStreamEvent event) {
-        try {
-            queue.put(event);
-        } catch (InterruptedException e) {
-            LOG.error(e.getMessage(),e);
-        }
-    }
-
-    /**
-     * Flush will be called in synchronous way like StormBolt.execute() as 
Storm OutputCollector is not thread-safe
-     */
-    @Override
-    public void flush() {
-        if(!queue.isEmpty()) {
-            List<AlertStreamEvent> events = new ArrayList<>();
-            queue.drainTo(events);
-            events.forEach((event) -> 
delegate.emit(Arrays.asList(event.getStreamId(), event)));
-            LOG.info("Flushed {} alerts", events.size());
-        }
-        lastFlushTime.set(System.currentTimeMillis());
-    }
-
-    @Override
-    public void close() {
-        this.flusher.shutdown();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
deleted file mode 100755
index 8d8a4d2..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.Arrays;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-import backtype.storm.task.OutputCollector;
-
-public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector {
-    private final OutputCollector delegate;
-    private final Object outputLock;
-    private final StreamContext streamContext;
-
-    public AlertBoltOutputCollectorWrapper(OutputCollector outputCollector, 
Object outputLock, StreamContext streamContext){
-        this.delegate = outputCollector;
-        this.outputLock = outputLock;
-        this.streamContext = streamContext;
-    }
-
-    @Override
-    public void emit(AlertStreamEvent event) {
-        synchronized (outputLock) {
-            streamContext.counter().scope("alert_count").incr();
-            delegate.emit(Arrays.asList(event.getStreamId(), event));
-        }
-    }
-
-    @Override
-    public void flush() {
-        // do nothing
-    }
-
-    @Override
-    public void close() {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
deleted file mode 100644
index f063618..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.AlertStreamCollector;
-import org.apache.eagle.alert.engine.StreamContext;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandlers;
-import org.apache.eagle.alert.engine.model.PartitionedEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
-    private static final long serialVersionUID = -5499413193675985288L;
-
-    private final static Logger LOG = 
LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
-
-    private AlertStreamCollector collector;
-    // mapping from policy name to PolicyDefinition
-    private volatile Map<String,PolicyDefinition> policyDefinitionMap = new 
HashMap<>();
-    // mapping from policy name to PolicyStreamHandler
-    private volatile Map<String,PolicyStreamHandler> policyStreamHandlerMap = 
new HashMap<>();
-    private String policyEvaluatorId;
-    private StreamContext context;
-
-    public PolicyGroupEvaluatorImpl(String policyEvaluatorId){
-        this.policyEvaluatorId = policyEvaluatorId;
-    }
-
-    public void init(StreamContext context, AlertStreamCollector collector) {
-        this.collector = collector;
-        this.policyStreamHandlerMap = new HashMap<>();
-        this.context = context;
-        Thread.currentThread().setName(policyEvaluatorId);
-    }
-
-    public void nextEvent(PartitionedEvent event) {
-        this.context.counter().scope("receive_count").incr();
-        dispatch(event);
-    }
-
-    @Override
-    public String getName() {
-        return this.policyEvaluatorId;
-    }
-
-    public void close() {
-        for(PolicyStreamHandler handler: policyStreamHandlerMap.values()){
-            try {
-                handler.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close handler {}",handler.toString(),e);
-            }
-        }
-    }
-
-    /**
-     * fixme make selection of PolicyStreamHandler to be more efficient
-     * @param partitionedEvent PartitionedEvent
-     */
-    private void dispatch(PartitionedEvent partitionedEvent){
-        boolean handled = false;
-        for(Map.Entry<String,PolicyStreamHandler> policyStreamHandler: 
policyStreamHandlerMap.entrySet()){
-            
if(isAcceptedByPolicy(partitionedEvent,policyDefinitionMap.get(policyStreamHandler.getKey()))){
-                try {
-                    handled = true;
-                    this.context.counter().scope("eval_count").incr();
-                    
policyStreamHandler.getValue().send(partitionedEvent.getEvent());
-                } catch (Exception e) {
-                    this.context.counter().scope("fail_count").incr();
-                    LOG.error("{} failed to handle 
{}",policyStreamHandler.getValue(), partitionedEvent.getEvent());
-                }
-            }
-        }
-        if(!handled){
-            this.context.counter().scope("drop_count").incr();
-            LOG.warn("Drop stream non-matched event {}, which should not be 
sent to evaluator", partitionedEvent);
-        } else {
-            this.context.counter().scope("accept_count").incr();
-        }
-    }
-
-    private static boolean isAcceptedByPolicy(PartitionedEvent event, 
PolicyDefinition policy){
-        return policy.getInputStreams() != null
-                && 
policy.getInputStreams().contains(event.getEvent().getStreamId())
-                && policy.getPartitionSpec().contains(event.getPartition());
-    }
-
-    @Override
-    public void onPolicyChange(List<PolicyDefinition> added, 
List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, 
StreamDefinition> sds) {
-        Map<String,PolicyDefinition> copyPolicies = new 
HashMap<>(policyDefinitionMap);
-        Map<String,PolicyStreamHandler> copyHandlers = new 
HashMap<>(policyStreamHandlerMap);
-        for(PolicyDefinition pd : added){
-            inplaceAdd(copyPolicies, copyHandlers, pd, sds);
-        }
-        for(PolicyDefinition pd : removed){
-            inplaceRemove(copyPolicies, copyHandlers, pd);
-        }
-        for(PolicyDefinition pd : modified){
-            inplaceRemove(copyPolicies, copyHandlers, pd);
-            inplaceAdd(copyPolicies, copyHandlers, pd, sds);
-        }
-
-        // logging
-        LOG.info("Policy metadata updated with added={}, removed={}, 
modified={}", added, removed, modified);
-
-        // switch reference
-        this.policyDefinitionMap = copyPolicies;
-        this.policyStreamHandlerMap = copyHandlers;
-    }
-
-    private void inplaceAdd(Map<String, PolicyDefinition> policies, 
Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy, Map<String, 
StreamDefinition> sds) {
-        if(handlers.containsKey(policy.getName())){
-            LOG.error("metadata calculation error, try to add existing 
PolicyDefinition " + policy);
-        }else {
-            policies.put(policy.getName(), policy);
-            PolicyStreamHandler handler = 
PolicyStreamHandlers.createHandler(policy.getDefinition().type, sds);
-            try {
-                PolicyHandlerContext context = new PolicyHandlerContext();
-                context.setPolicyCounter(this.context.counter());
-                context.setPolicyDefinition(policy);
-                context.setParentEvaluator(this);
-                context.setPolicyEvaluatorId(policyEvaluatorId);
-                handler.prepare(collector, context);
-                handlers.put(policy.getName(), handler);
-            } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
-                policies.remove(policy.getName());
-                handlers.remove(policy.getName());
-            }
-        }
-    }
-
-    private void inplaceRemove(Map<String, PolicyDefinition> policies, 
Map<String, PolicyStreamHandler> handlers, PolicyDefinition policy)  {
-        if(handlers.containsKey(policy.getName())) {
-            PolicyStreamHandler handler = handlers.get(policy.getName());
-            try {
-                handler.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close handler {}",handler,e);
-            }finally {
-                policies.remove(policy.getName());
-                handlers.remove(policy.getName());
-                LOG.info("Removed policy: {}",policy);
-            }
-        } else {
-            LOG.error("metadata calculation error, try to remove nonexisting 
PolicyDefinition: "+policy);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
deleted file mode 100644
index 5b83abb..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
-
-import com.google.common.base.Preconditions;
-
-public class SiddhiDefinitionAdapter {
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiDefinitionAdapter.class);
-    public final static String DEFINE_STREAM_TEMPLATE =  "define stream %s ( 
%s );";
-
-    public static String buildStreamDefinition(StreamDefinition 
streamDefinition){
-        List<String> columns = new ArrayList<>();
-        Preconditions.checkNotNull(streamDefinition,"StreamDefinition is 
null");
-        if(streamDefinition.getColumns()!=null) {
-            for (StreamColumn column : streamDefinition.getColumns()) {
-                columns.add(String.format("%s %s", column.getName(), 
convertToSiddhiAttributeType(column.getType()).toString().toLowerCase()));
-            }
-        }else{
-            LOG.warn("No columns found for stream 
{}"+streamDefinition.getStreamId());
-        }
-        return String.format(DEFINE_STREAM_TEMPLATE, 
streamDefinition.getStreamId(),StringUtils.join(columns,","));
-    }
-
-    public static Attribute.Type 
convertToSiddhiAttributeType(StreamColumn.Type type){
-        if(_EAGLE_SIDDHI_TYPE_MAPPING.containsKey(type)){
-            return _EAGLE_SIDDHI_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown stream type: "+type);
-    }
-
-    public static Class<?> convertToJavaAttributeType(StreamColumn.Type type){
-        if(_EAGLE_JAVA_TYPE_MAPPING.containsKey(type)){
-            return _EAGLE_JAVA_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown stream type: "+type);
-    }
-
-    public static StreamColumn.Type convertFromJavaAttributeType(Class<?> 
type){
-        if(_JAVA_EAGLE_TYPE_MAPPING.containsKey(type)){
-            return _JAVA_EAGLE_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown stream type: "+type);
-    }
-
-    public static StreamColumn.Type 
convertFromSiddhiAttributeType(Attribute.Type type){
-        if(_SIDDHI_EAGLE_TYPE_MAPPING.containsKey(type)){
-            return _SIDDHI_EAGLE_TYPE_MAPPING.get(type);
-        }
-
-        throw new IllegalArgumentException("Unknown siddhi type: "+type);
-    }
-
-    /**
-     * public enum Type {
-     *   STRING, INT, LONG, FLOAT, DOUBLE, BOOL, OBJECT
-     * }
-     */
-    private final static Map<StreamColumn.Type,Attribute.Type> 
_EAGLE_SIDDHI_TYPE_MAPPING = new HashMap<>();
-    private final static Map<StreamColumn.Type,Class<?>> 
_EAGLE_JAVA_TYPE_MAPPING = new HashMap<>();
-    private final static Map<Class<?>,StreamColumn.Type> 
_JAVA_EAGLE_TYPE_MAPPING = new HashMap<>();
-    private final static Map<Attribute.Type,StreamColumn.Type> 
_SIDDHI_EAGLE_TYPE_MAPPING = new HashMap<>();
-
-    static {
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.STRING,Attribute.Type.STRING);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.INT,Attribute.Type.INT);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.LONG,Attribute.Type.LONG);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Attribute.Type.FLOAT);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Attribute.Type.DOUBLE);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Attribute.Type.BOOL);
-        
_EAGLE_SIDDHI_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Attribute.Type.OBJECT);
-
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.STRING,String.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.INT,Integer.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.LONG,Long.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.FLOAT,Float.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.DOUBLE,Double.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.BOOL,Boolean.class);
-        _EAGLE_JAVA_TYPE_MAPPING.put(StreamColumn.Type.OBJECT,Object.class);
-
-        _JAVA_EAGLE_TYPE_MAPPING.put(String.class,StreamColumn.Type.STRING);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Integer.class,StreamColumn.Type.INT);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Long.class,StreamColumn.Type.LONG);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Float.class,StreamColumn.Type.FLOAT);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Double.class,StreamColumn.Type.DOUBLE);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Boolean.class,StreamColumn.Type.BOOL);
-        _JAVA_EAGLE_TYPE_MAPPING.put(Object.class,StreamColumn.Type.OBJECT);
-
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.STRING,StreamColumn.Type.STRING);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.INT,StreamColumn.Type.INT);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.LONG,StreamColumn.Type.LONG);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.FLOAT,StreamColumn.Type.FLOAT);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.DOUBLE,StreamColumn.Type.DOUBLE);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.BOOL,StreamColumn.Type.BOOL);
-        
_SIDDHI_EAGLE_TYPE_MAPPING.put(Attribute.Type.OBJECT,StreamColumn.Type.OBJECT);
-    }
-
-    public static StreamDefinition 
convertFromSiddiDefinition(AbstractDefinition siddhiDefinition){
-        StreamDefinition streamDefinition = new StreamDefinition();
-        streamDefinition.setStreamId(siddhiDefinition.getId());
-        List<StreamColumn> columns = new 
ArrayList<>(siddhiDefinition.getAttributeNameArray().length);
-        for(Attribute attribute:siddhiDefinition.getAttributeList()){
-            StreamColumn column = new StreamColumn();
-            
column.setType(convertFromSiddhiAttributeType(attribute.getType()));
-            column.setName(attribute.getName());
-            columns.add(column);
-        }
-        streamDefinition.setColumns(columns);
-        streamDefinition.setTimeseries(true);
-        streamDefinition.setDescription("Auto-generated stream schema from 
siddhi for "+siddhiDefinition.getId());
-        return streamDefinition;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
deleted file mode 100755
index dfa5612..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.engine.evaluator.impl;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.engine.Collector;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import 
org.apache.eagle.alert.engine.coordinator.StreamDefinitionNotFoundException;
-import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
-import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-public class SiddhiPolicyHandler implements PolicyStreamHandler {
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyHandler.class);
-    private ExecutionPlanRuntime executionRuntime;
-    private SiddhiManager siddhiManager;
-    private Map<String, StreamDefinition> sds;
-    private PolicyDefinition policy;
-    private PolicyHandlerContext context;
-
-    public SiddhiPolicyHandler(Map<String, StreamDefinition> sds){
-        this.sds = sds;
-    }
-
-    private static String generateExecutionPlan(PolicyDefinition 
policyDefinition, Map<String, StreamDefinition> sds) throws 
StreamDefinitionNotFoundException {
-        StringBuilder builder = new StringBuilder();
-        for(String inputStream:policyDefinition.getInputStreams()) {
-            
builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream)));
-            builder.append("\n");
-        }
-        builder.append(policyDefinition.getDefinition().value);
-        if(LOG.isDebugEnabled()) LOG.debug("Generated siddhi execution plan: 
{} from policy: {}", builder.toString(),policyDefinition);
-        return builder.toString();
-    }
-
-    private static class AlertStreamCallback extends StreamCallback{
-        private final String outputStream;
-        private final Collector<AlertStreamEvent> collector;
-        private final PolicyHandlerContext context;
-        private final StreamDefinition definition;
-
-        public AlertStreamCallback(String outputStream, StreamDefinition 
streamDefinition, Collector<AlertStreamEvent> collector, PolicyHandlerContext 
context){
-            this.outputStream = outputStream;
-            this.collector = collector;
-            this.context = context;
-            this.definition = streamDefinition;
-        }
-
-        /**
-         * Possibly more than one event will be triggered for alerting
-         * @param events
-         */
-        @Override
-        public void receive(Event[] events) {
-            LOG.info("Generated {} alerts from policy '{}' in {}", 
events.length,context.getPolicyDefinition().getName(), 
context.getPolicyEvaluatorId());
-            for(Event e : events) {
-                AlertStreamEvent event = new AlertStreamEvent();
-                event.setTimestamp(e.getTimestamp());
-                event.setData(e.getData());
-                event.setStreamId(outputStream);
-                event.setPolicy(context.getPolicyDefinition());
-                if (this.context.getParentEvaluator() != null) {
-                    event.setCreatedBy(context.getParentEvaluator().getName());
-                }
-                event.setCreatedTime(System.currentTimeMillis());
-                event.setSchema(definition);
-                if(LOG.isDebugEnabled())
-                    LOG.debug("Generate new alert event: {}", event);
-                collector.emit(event);
-            }
-            
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"alert_count")).incrBy(events.length);
-        }
-    }
-
-    @Override
-    public void prepare(final Collector<AlertStreamEvent> collector, 
PolicyHandlerContext context) throws Exception {
-        LOG.info("Initializing handler for policy {}: 
{}",context.getPolicyDefinition(),this);
-        this.policy = context.getPolicyDefinition();
-        this.siddhiManager = new SiddhiManager();
-        String plan = generateExecutionPlan(policy, sds);
-        try {
-            this.executionRuntime = 
siddhiManager.createExecutionPlanRuntime(plan);
-            LOG.info("Created siddhi runtime {}",executionRuntime.getName());
-        }catch (Exception parserException){
-            LOG.error("Failed to create siddhi runtime for policy: {}, siddhi 
plan: \n\n{}\n",context.getPolicyDefinition().getName(),plan,parserException);
-            throw parserException;
-        }
-        for(final String outputStream:policy.getOutputStreams()){
-            
if(executionRuntime.getStreamDefinitionMap().containsKey(outputStream)) {
-                this.executionRuntime.addCallback(outputStream,
-                        new AlertStreamCallback(
-                        outputStream, 
SiddhiDefinitionAdapter.convertFromSiddiDefinition(executionRuntime.getStreamDefinitionMap().get(outputStream))
-                        ,collector, context));
-            } else {
-                throw new IllegalStateException("Undefined output stream 
"+outputStream);
-            }
-        }
-        this.executionRuntime.start();
-        this.context = context;
-        LOG.info("Initialized policy handler for policy: {}",policy.getName());
-    }
-
-    public void send(StreamEvent event) throws Exception {
-        
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"receive_count")).incr();
-        String streamId = event.getStreamId();
-        InputHandler inputHandler = executionRuntime.getInputHandler(streamId);
-        if(inputHandler != null){
-            
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"eval_count")).incr();
-            inputHandler.send(event.getTimestamp(),event.getData());
-        }else{
-            
context.getPolicyCounter().scope(String.format("%s.%s",this.context.getPolicyDefinition().getName(),"drop_count")).incr();
-            LOG.warn("No input handler found for stream {}",streamId);
-        }
-    }
-
-    public void close() {
-        LOG.info("Closing handler for policy {}",this.policy.getName());
-        this.executionRuntime.shutdown();
-        LOG.info("Shutdown siddhi runtime {}",this.executionRuntime.getName());
-        this.siddhiManager.shutdown();
-        LOG.info("Shutdown siddhi manager {}",this.siddhiManager);
-        LOG.info("Closed handler for policy {}",this.policy.getName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
deleted file mode 100644
index b894cc0..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertDeduplicator.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher;
-
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-/**
- * Dedup Eagle entities.
- */
-public interface AlertDeduplicator {
-
-       AlertStreamEvent dedup(AlertStreamEvent event);
-
-       void setDedupIntervalMin(String intervalMin);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
deleted file mode 100644
index 8f1c248..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.eagle.alert.engine.publisher;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface AlertPublishListener {
-    void onPublishChange(List<Publishment> added,
-                         List<Publishment> removed,
-                         List<Publishment> afterModified,
-                         List<Publishment> beforeModified);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
deleted file mode 100644
index 644fe2b..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishPlugin.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher;
-
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.impl.PublishStatus;
-
-import com.typesafe.config.Config;
-
-import java.io.Closeable;
-import java.util.Map;
-
-/**
- * Created on 2/10/16.
- * Notification Plug-in interface which provide abstraction layer to notify to 
different system
- */
-public interface AlertPublishPlugin extends Closeable {
-    /**
-     * for initialization
-     * @throws Exception
-     */
-    void init(Config config, Publishment publishment) throws Exception;
-
-    void update(String dedupIntervalMin, Map<String, String> pluginProperties);
-
-    void close();
-
-    void onAlert(AlertStreamEvent event) throws Exception;
-
-    AlertStreamEvent dedup(AlertStreamEvent event);
-
-    PublishStatus getStatus();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
deleted file mode 100644
index fc1fc28..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublishSpecListener.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.eagle.alert.engine.publisher;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface AlertPublishSpecListener {
-    void onAlertPublishSpecChange(PublishSpec spec, Map<String, 
StreamDefinition> sds);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
deleted file mode 100644
index 7a44009..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertPublisher.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.eagle.alert.engine.publisher;
-
-
-import java.io.Serializable;
-
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-
-import com.typesafe.config.Config;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface AlertPublisher extends AlertPublishListener, Serializable {
-    void init(Config config);
-    String getName();
-    void nextEvent(AlertStreamEvent event);
-    void close();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
deleted file mode 100644
index 91c9296..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/AlertSink.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.eagle.alert.engine.publisher;
-
-import java.io.Serializable;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface AlertSink extends Serializable {
-    /**
-     *
-     * @throws Exception
-     */
-    void open() throws Exception;
-
-    /**
-     *
-     * @throws Exception
-     */
-    void close() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
deleted file mode 100644
index 54a9afa..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher;
-
-public class PublishConstants {
-    public static final String NOTIFICATION_TYPE = "type";
-    public static final String EMAIL_NOTIFICATION = "email";
-    public static final String KAFKA_STORE = "kafka";
-    public static final String EAGLE_STORE = "eagleStore";
-
-    // email specific constants
-    public static final String SUBJECT = "subject";
-    public static final String SENDER = "sender";
-    public static final String RECIPIENTS = "recipients";
-    public static final String TEMPLATE = "template";
-
-    // kafka specific constants
-    public static final String TOPIC = "topic";
-    public static final String BROKER_LIST = "kafka_broker";
-
-    public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp";
-    public static final String ALERT_EMAIL_COUNT_PROPERTY = "count";
-    public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
-    public static final String ALERT_EMAIL_ORIGIN_PROPERTY = 
"alertEmailOrigin";
-
-    public static final String ALERT_EMAIL_MESSAGE = "alertMessage";
-    public static final String ALERT_EMAIL_STREAM = "streamId";
-    public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
-    public static final String ALERT_EMAIL_POLICY = "policyId";
-    public static final String ALERT_EMAIL_CREATOR = "creator";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
deleted file mode 100644
index 7d31bb5..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailConstants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.email;
-
-public class AlertEmailConstants {
-
-    public static final String MAIL_AUTH = "mail.smtp.auth";
-    public static final String MAIL_HOST = "mail.smtp.host";
-    public static final String MAIL_PORT = "mail.smtp.port";
-
-    public static final String CONN_PLAINTEXT = "plaintext";
-    public static final String CONN_TLS = "tls";
-    public static final String CONN_SSL = "ssl";
-
-    public static final String CONF_MAIL_SERVER = "smtp.server";
-    public static final String CONF_MAIL_PORT = "smtp.port";
-    public static final String CONF_MAIL_CONN = "connection";
-    public static final String CONF_MAIL_DEBUG = "mailDebug";
-    public static final String CONF_MAIL_AUTH = "smtp.auth.enable";
-    public static final String CONF_AUTH_USER = "auth.username";
-    public static final String CONF_AUTH_PASSWORD = "auth.password";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
deleted file mode 100644
index 8723283..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailContext.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.email;
-
-import java.util.Map;
-
-/**
- * alert email bean
- * one email consists of a list of email component
- */
-public class AlertEmailContext {
-    private Map<String, String> alertContext;
-    private String sender;
-    private String subject;
-    private String recipients;
-    private String velocityTplFile;
-    private String cc;
-
-    public Map<String, String> getAlertContext() {
-        return alertContext;
-    }
-
-    public void setAlertContext(Map<String, String> alertContext) {
-        this.alertContext = alertContext;
-    }
-    public String getVelocityTplFile() {
-        return velocityTplFile;
-    }
-    public void setVelocityTplFile(String velocityTplFile) {
-        this.velocityTplFile = velocityTplFile;
-    }
-    public String getRecipients() {
-        return recipients;
-    }
-    public void setRecipients(String recipients) {
-        this.recipients = recipients;
-    }
-    public String getSender() {
-        return sender;
-    }
-    public void setSender(String sender) {
-        this.sender = sender;
-    }
-    public String getSubject() {
-        return subject;
-    }
-    public void setSubject(String subject) {
-        this.subject = subject;
-    }
-    public String getCc() {
-        return cc;
-    }
-    public void setCc(String cc) {
-        this.cc = cc;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
deleted file mode 100644
index 0c68629..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-/**
- *
- */
-package org.apache.eagle.alert.engine.publisher.email;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailGenerator {
-    private String tplFile;
-    private String sender;
-    private String recipients;
-    private String subject;
-    private Map<String, String> properties;
-
-    private ThreadPoolExecutor executorPool;
-
-    private final static Logger LOG = 
LoggerFactory.getLogger(AlertEmailGenerator.class);
-
-    private final static long MAX_TIMEOUT_MS =60000;
-
-    public boolean sendAlertEmail(AlertStreamEvent entity) {
-        return sendAlertEmail(entity, recipients, null);
-    }
-
-    public boolean sendAlertEmail(AlertStreamEvent entity, String recipients) {
-        return sendAlertEmail(entity, recipients, null);
-    }
-
-    public boolean sendAlertEmail(AlertStreamEvent event, String recipients, 
String cc) {
-        AlertEmailContext email = new AlertEmailContext();
-        Map<String, String> alertContext = buildAlertContext(event);
-        email.setAlertContext(alertContext);
-        email.setVelocityTplFile(tplFile);
-        email.setSubject(subject);
-        email.setSender(sender);
-        email.setRecipients(recipients);
-        email.setCc(cc);
-        
-        /** asynchronized email sending */
-        AlertEmailSender thread = new AlertEmailSender(email, properties);
-
-        if(this.executorPool == null) throw new 
IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
-        LOG.info("Sending email  in asynchronous to: " + recipients + ", cc: " 
+ cc);
-        Future<?> future = this.executorPool.submit(thread);
-        Boolean status;
-        try {
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            status = true;
-            //LOG.info(String.format("Successfully send email to %s", 
recipients));
-        } catch (InterruptedException | ExecutionException e) {
-            status = false;
-            LOG.error(String.format("Failed to send email to %s, due to:%s", 
recipients, e),e);
-        } catch (TimeoutException e) {
-            status = false;
-            LOG.error(String.format("Failed to send email to %s due to timeout 
exception, max timeout: %s ms ", recipients, MAX_TIMEOUT_MS),e);
-        }
-        return status;
-    }
-
-    private Map<String, String> buildAlertContext(AlertStreamEvent event) {
-        Map<String, String> alertContext = new HashMap<>();
-        alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, 
event.toString());
-        alertContext.put(PublishConstants.ALERT_EMAIL_POLICY, 
event.getPolicyId());
-        alertContext.put(PublishConstants.ALERT_EMAIL_TIMESTAMP, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
-        alertContext.put(PublishConstants.ALERT_EMAIL_STREAM, 
event.getStreamId());
-        alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, 
event.getCreatedBy());
-        return alertContext;
-    }
-
-    public String getTplFile() {
-        return tplFile;
-    }
-
-    public void setTplFile(String tplFile) {
-        this.tplFile = tplFile;
-    }
-
-    public String getSender() {
-        return sender;
-    }
-
-    public void setSender(String sender) {
-        this.sender = sender;
-    }
-
-    public String getRecipients() {
-        return recipients;
-    }
-
-    public void setRecipients(String recipients) {
-        this.recipients = recipients;
-    }
-
-    public String getSubject() {
-        return subject;
-    }
-
-    public void setSubject(String subject) {
-        this.subject = subject;
-    }
-
-    public Map<String, String> getProperties() {
-        return properties;
-    }
-
-    public void setProperties(Map<String, String> properties) {
-        this.properties = properties;
-    }
-
-    public void setExecutorPool(ThreadPoolExecutor executorPool) {
-        this.executorPool = executorPool;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
deleted file mode 100644
index e9635dd..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGeneratorBuilder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.email;
-
-import java.util.Map;
-import java.util.concurrent.ThreadPoolExecutor;
-
-public class AlertEmailGeneratorBuilder {
-    private AlertEmailGenerator generator;
-    private AlertEmailGeneratorBuilder(){
-        generator = new AlertEmailGenerator();
-    }
-    public static AlertEmailGeneratorBuilder newBuilder(){
-        return new AlertEmailGeneratorBuilder();
-    }
-    public AlertEmailGeneratorBuilder withSubject(String subject){
-        generator.setSubject(subject);
-        return this;
-    }
-    public AlertEmailGeneratorBuilder withSender(String sender){
-        generator.setSender(sender);
-        return this;
-    }
-    public AlertEmailGeneratorBuilder withRecipients(String recipients){
-        generator.setRecipients(recipients);
-        return this;
-    }
-    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
-        generator.setTplFile(tplFile);
-        return this;
-    }
-    public AlertEmailGeneratorBuilder withMailProps(Map<String, String> 
mailProps) {
-        generator.setProperties(mailProps);
-        return this;
-    }
-    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor 
threadPoolExecutor) {
-        generator.setExecutorPool(threadPoolExecutor);
-        return this;
-    }
-
-    public AlertEmailGenerator build(){
-        return this.generator;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
deleted file mode 100644
index d0e5cf6..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailSender.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.email;
-
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.eagle.alert.engine.publisher.PublishConstants;
-import org.apache.eagle.alert.utils.DateTimeUtil;
-import org.apache.velocity.VelocityContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailSender implements Runnable {
-
-    protected final List<Map<String, String>> alertContexts = new 
ArrayList<Map<String, String>>();
-    protected final String configFileName;
-    protected final String subject;
-    protected final String sender;
-    protected final String recipients;
-    protected final String cc;
-    protected final String origin;
-    protected boolean sentSuccessfully = false;
-
-    private final static Logger LOG = 
LoggerFactory.getLogger(AlertEmailSender.class);
-    private final static int MAX_RETRY_COUNT = 3;
-
-
-
-    private Map<String, String> mailProps;
-
-
-    private String threadName;
-    /**
-     * Derived class may have some additional context properties to add
-     * @param context velocity context
-     * @param env environment
-     */
-    protected void additionalContext(VelocityContext context, String env) {
-        // By default there's no additional context added
-    }
-
-    public AlertEmailSender(AlertEmailContext alertEmail){
-        this.recipients = alertEmail.getRecipients();
-        this.configFileName = alertEmail.getVelocityTplFile();
-        this.subject = alertEmail.getSubject();
-        this.sender = alertEmail.getSender();
-        this.cc = alertEmail.getCc();
-
-        this.alertContexts.add(alertEmail.getAlertContext());
-        String tmp = ManagementFactory.getRuntimeMXBean().getName();
-        this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
-        threadName = Thread.currentThread().getName();
-        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", 
recipient of the email: " + this.recipients +", velocity TPL file: " + 
this.configFileName);
-    }
-
-    public AlertEmailSender(AlertEmailContext alertEmail, Map<String, String> 
mailProps){
-        this(alertEmail);
-        this.mailProps = mailProps;
-    }
-
-    private Properties parseMailClientConfig(Map<String, String> mailProps) {
-        if (mailProps == null) return null;
-        Properties props = new Properties();
-        String mailHost = mailProps.get(AlertEmailConstants.CONF_MAIL_SERVER);
-        String mailPort = mailProps.get(AlertEmailConstants.CONF_MAIL_PORT);
-        if (mailHost == null || mailPort == null || mailHost.isEmpty()) {
-            LOG.warn("SMTP server is unset, will exit");
-            return null;
-        }
-        props.put(AlertEmailConstants.MAIL_HOST, mailHost);
-        props.put(AlertEmailConstants.MAIL_PORT, mailPort);
-
-        String smtpAuth = 
mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_AUTH, "false");
-        props.put(AlertEmailConstants.MAIL_AUTH, smtpAuth);
-        if (Boolean.parseBoolean(smtpAuth)) {
-            props.put(AlertEmailConstants.CONF_AUTH_USER, 
mailProps.get(AlertEmailConstants.CONF_AUTH_USER));
-            props.put(AlertEmailConstants.CONF_AUTH_PASSWORD, 
mailProps.get(AlertEmailConstants.CONF_AUTH_PASSWORD));
-        }
-
-        String smtpConn = 
mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_CONN, 
AlertEmailConstants.CONN_PLAINTEXT);
-        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_TLS)) {
-            props.put("mail.smtp.starttls.enable", "true");
-        }
-        if (smtpConn.equalsIgnoreCase(AlertEmailConstants.CONN_SSL)) {
-            props.put("mail.smtp.socketFactory.port", "465");
-            props.put("mail.smtp.socketFactory.class",
-                    "javax.net.ssl.SSLSocketFactory");
-        }
-        props.put(AlertEmailConstants.CONF_MAIL_DEBUG, 
mailProps.getOrDefault(AlertEmailConstants.CONF_MAIL_DEBUG, "false"));
-        return props;
-    }
-
-    @Override
-    public void run() {
-        int count = 0;
-        boolean success = false;
-        while(count++ < MAX_RETRY_COUNT && !success){
-            LOG.info("Sending email, tried: " + count+", max: " + 
MAX_RETRY_COUNT);
-            try {
-                final EagleMailClient client;
-                if (mailProps != null) {
-                    Properties props = parseMailClientConfig(mailProps);
-                    client = new EagleMailClient(props);
-                }
-                else {
-                    client = new EagleMailClient();
-                }
-
-                final VelocityContext context = new VelocityContext();
-                generateCommonContext(context);
-                LOG.info("After calling generateCommonContext...");
-
-                if (recipients == null || recipients.equals("")) {
-                    LOG.error("Recipients is null, skip sending emails ");
-                    return;
-                }
-                String title = subject;
-
-                success = client.send(sender, recipients, cc, title, 
configFileName, context, null);
-                LOG.info("Success of sending email: " + success);
-                if(!success && count < MAX_RETRY_COUNT) {
-                    LOG.info("Sleep for a while before retrying");
-                    Thread.sleep(10 * 1000);
-                }
-            }
-            catch (Exception e){
-                LOG.warn("Sending mail exception", e);
-            }
-        }
-        if (success) {
-            sentSuccessfully = true;
-            LOG.info(String.format("Successfully send email, thread: %s", 
threadName));
-        } else{
-            LOG.warn(String.format("Fail sending email after tries %s times, 
thread: %s", MAX_RETRY_COUNT, threadName));
-        }
-    }
-
-    private void generateCommonContext(VelocityContext context) {
-        context.put(PublishConstants.ALERT_EMAIL_TIME_PROPERTY, 
DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
-        context.put(PublishConstants.ALERT_EMAIL_COUNT_PROPERTY, 
alertContexts.size());
-        context.put(PublishConstants.ALERT_EMAIL_ALERTLIST_PROPERTY, 
alertContexts);
-        context.put(PublishConstants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
deleted file mode 100755
index 61194a2..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/email/EagleMailClient.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.eagle.alert.engine.publisher.email;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.activation.DataHandler;
-import javax.activation.DataSource;
-import javax.activation.FileDataSource;
-import javax.mail.Authenticator;
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Multipart;
-import javax.mail.PasswordAuthentication;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.AddressException;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
-import javax.mail.internet.MimeMultipart;
-
-import org.apache.velocity.Template;
-import org.apache.velocity.VelocityContext;
-import org.apache.velocity.app.VelocityEngine;
-import org.apache.velocity.exception.ResourceNotFoundException;
-import org.apache.velocity.runtime.RuntimeConstants;
-import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class EagleMailClient {
-//     private static final String CONFIG_FILE = "config.properties";
-       private static final String BASE_PATH = "templates/";
-
-       private VelocityEngine velocityEngine;
-       private Session session;
-       private static final Logger LOG = 
LoggerFactory.getLogger(EagleMailClient.class);
-
-       public EagleMailClient() {
-               this(new Properties());
-       }
-       
-       public EagleMailClient(final Properties config) {
-               try {
-                       velocityEngine = new VelocityEngine();
-                       
velocityEngine.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
-                       
velocityEngine.setProperty("classpath.resource.loader.class", 
ClasspathResourceLoader.class.getName());
-                       velocityEngine.init();
-
-                       config.put("mail.transport.protocol", "smtp");
-                       
if(Boolean.parseBoolean(config.getProperty(AlertEmailConstants.CONF_MAIL_AUTH))){
-                               session = Session.getDefaultInstance(config, 
new Authenticator() {
-                                       protected PasswordAuthentication 
getPasswordAuthentication() {
-                                               return new 
PasswordAuthentication(config.getProperty(AlertEmailConstants.CONF_AUTH_USER), 
config.getProperty(AlertEmailConstants.CONF_AUTH_PASSWORD));
-                                       }
-                               });
-                       }
-                       else session = Session.getDefaultInstance(config, new 
Authenticator() {});
-                       final String debugMode =  
config.getProperty(AlertEmailConstants.CONF_MAIL_DEBUG, "false");
-                       final boolean debug =  Boolean.parseBoolean(debugMode);
-                       session.setDebug(debug);
-               } catch (Exception e) {
-            LOG.error("Failed connect to smtp server",e);
-               }
-       }
-
-       private boolean _send(String from, String to, String cc, String title,
-                       String content) {
-               Message msg = new MimeMessage(session);
-               try {
-                       msg.setFrom(new InternetAddress(from));
-                       msg.setSubject(title);
-                       if (to != null) {
-                               msg.setRecipients(Message.RecipientType.TO,
-                                               InternetAddress.parse(to));
-                       }
-                       if (cc != null) {
-                               msg.setRecipients(Message.RecipientType.CC,
-                                               InternetAddress.parse(cc));
-                       }
-                       //msg.setRecipients(Message.RecipientType.BCC, 
InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-                       msg.setContent(content, "text/html;charset=utf-8");
-                       LOG.info(String.format("Going to send mail: from[%s], 
to[%s], cc[%s], title[%s]", from, to, cc, title));
-                       Transport.send(msg);
-                       return true;
-               } catch (AddressException e) {
-                       LOG.info("Send mail failed, got an AddressException: " 
+ e.getMessage(), e);
-                       return false;
-               } catch (MessagingException e) {
-                       LOG.info("Send mail failed, got an AddressException: " 
+ e.getMessage(), e);
-                       return false;
-               }
-       }
-
-       private boolean _send(String from,String to,String cc,String 
title,String content,List<MimeBodyPart> attachments){
-               MimeMessage  mail = new MimeMessage(session);
-               try {
-                       mail.setFrom(new InternetAddress(from));
-                       mail.setSubject(title);
-                       if (to != null) {
-                               mail.setRecipients(Message.RecipientType.TO,
-                                               InternetAddress.parse(to));
-                       }
-                       if (cc != null) {
-                               mail.setRecipients(Message.RecipientType.CC,
-                                               InternetAddress.parse(cc));
-                       }
-                       
-                       //mail.setRecipients(Message.RecipientType.BCC, 
InternetAddress.parse(DEFAULT_BCC_ADDRESS));
-
-                       MimeBodyPart mimeBodyPart = new MimeBodyPart();
-                       
mimeBodyPart.setContent(content,"text/html;charset=utf-8");
-
-                       Multipart  multipart = new MimeMultipart();
-                       multipart.addBodyPart(mimeBodyPart);
-
-                       for(MimeBodyPart attachment:attachments){
-                               multipart.addBodyPart(attachment);
-                       }
-
-                       mail.setContent(multipart);
-//                     mail.setContent(content, "text/html;charset=utf-8");
-                       LOG.info(String.format("Going to send mail: from[%s], 
to[%s], cc[%s], title[%s]", from, to, cc, title));
-                       Transport.send(mail);
-                       return true;
-               } catch (AddressException e) {
-                       LOG.info("Send mail failed, got an AddressException: " 
+ e.getMessage(), e);
-                       return false;
-               } catch (MessagingException e) {
-                       LOG.info("Send mail failed, got an AddressException: " 
+ e.getMessage(), e);
-                       return false;
-               }
-       }
-
-       public boolean send(String from, String to, String cc, String title,
-                       String content) {
-               return this._send(from, to, cc, title, content);
-       }
-
-       public boolean send(String from, String to, String cc, String title,
-                       String templatePath, VelocityContext context) {
-               Template t = null;
-               try {
-                       t = velocityEngine.getTemplate(BASE_PATH + 
templatePath);
-               } catch (ResourceNotFoundException ex) {
-               }
-               if (t == null) {
-                       try {
-                               t = velocityEngine.getTemplate(templatePath);
-                       } catch (ResourceNotFoundException e) {
-                               t = velocityEngine.getTemplate("/" + 
templatePath);
-                       }
-               }
-               final StringWriter writer = new StringWriter();
-               t.merge(context, writer);
-               if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
-               return this.send(from, to, cc, title, writer.toString());
-       }
-
-       public boolean send(String from, String to, String cc, String title,
-                           String templatePath, VelocityContext context, 
Map<String,File> attachments) {
-               if (attachments == null || attachments.isEmpty()) {
-                       return send(from, to, cc, title, templatePath, context);
-               }
-               Template t = null;
-
-               List<MimeBodyPart> mimeBodyParts = new 
ArrayList<MimeBodyPart>();
-               Map<String,String> cid = new HashMap<String,String>();
-
-               for (Map.Entry<String,File> entry : attachments.entrySet()) {
-                       final String attachment = entry.getKey();
-                       final File attachmentFile  = entry.getValue();
-                       final MimeBodyPart mimeBodyPart = new MimeBodyPart();
-                       if(attachmentFile !=null && attachmentFile.exists()){
-                               DataSource source = new 
FileDataSource(attachmentFile);
-                               try {
-                                       mimeBodyPart.setDataHandler(new 
DataHandler(source));
-                                       mimeBodyPart.setFileName(attachment);
-                                       
mimeBodyPart.setDisposition(MimeBodyPart.ATTACHMENT);
-                                       mimeBodyPart.setContentID(attachment);
-                                       
cid.put(attachment,mimeBodyPart.getContentID());
-                                       mimeBodyParts.add(mimeBodyPart);
-                               } catch (MessagingException e) {
-                                       LOG.error("Generate mail failed, got 
exception while attaching files: " + e.getMessage(), e);
-                               }
-                       }else{
-                               LOG.error("Attachment: " + attachment + " is 
null or not exists");
-                       }
-               }
-               //TODO remove cid, because not used at all
-               if(LOG.isDebugEnabled()) LOG.debug("Cid maps: "+cid);
-               context.put("cid", cid);
-
-               try {
-                       t = velocityEngine.getTemplate(BASE_PATH + 
templatePath);
-               } catch (ResourceNotFoundException ex) {
-//                     LOGGER.error("Template not found:"+BASE_PATH + 
templatePath, ex);
-               }
-
-               if (t == null) {
-                       try {
-                               t = velocityEngine.getTemplate(templatePath);
-                       } catch (ResourceNotFoundException e) {
-                               try {
-                                       t = velocityEngine.getTemplate("/" + 
templatePath);
-                               }
-                               catch (Exception ex) {
-                                       LOG.error("Template not found:"+ "/" + 
templatePath, ex);
-                               }
-                       }
-               }
-
-               final StringWriter writer = new StringWriter();
-               t.merge(context, writer);
-               if(LOG.isDebugEnabled()) LOG.debug(writer.toString());
-               return this._send(from, to, cc, title, writer.toString(), 
mimeBodyParts);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
deleted file mode 100644
index 2a4e332..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePersister.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.List;
-
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Alert API entity Persistor
- */
-public class AlertEagleStorePersister {
-       private static Logger LOG = 
LoggerFactory.getLogger(AlertEagleStorePersister.class);
-       private Config config;
-
-       public AlertEagleStorePersister(Config config) {
-               this.config = config;
-       }
-
-       /**
-        * Persist passes list of Entities
-        * @param list
-        * @return
-     */
-       public boolean doPersist(List<? extends StreamEvent> list) {
-               if (list.isEmpty()) return false;
-               LOG.info("Going to persist entities, type: " + " " + 
list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
-               try {
-                       IMetadataServiceClient client = new 
MetadataServiceClientImpl(config);
-                       // TODO: metadata service support
-               }
-               catch (Exception ex) {
-                       LOG.error("Got an exception in persisting entities", 
ex);
-                       return false;
-               }
-               return false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
 
b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
deleted file mode 100644
index 807aacc..0000000
--- 
a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEagleStorePublisher.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.engine.publisher.impl;
-
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.model.AlertStreamEvent;
-import org.apache.eagle.alert.engine.publisher.AlertDeduplicator;
-import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * Plugin to persist alerts to Eagle Storage
- */
-public class AlertEagleStorePublisher implements AlertPublishPlugin {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(AlertEagleStorePublisher.class);
-    private PublishStatus status;
-    private AlertEagleStorePersister persist;
-    private AlertDeduplicator deduplicator;
-
-    @Override
-    public void init(Config config, Publishment publishment) throws Exception {
-        this.persist = new AlertEagleStorePersister(config);
-        deduplicator = new 
DefaultDeduplicator(publishment.getDedupIntervalMin());
-        LOG.info("initialized plugin for EagleStorePlugin");
-    }
-
-    @Override
-    public void update(String dedupIntervalMin, Map<String, String> 
pluginProperties) {
-        deduplicator.setDedupIntervalMin(dedupIntervalMin);
-    }
-
-    @Override
-    public PublishStatus getStatus() {
-        return this.status;
-    }
-
-    @Override
-    public AlertStreamEvent dedup(AlertStreamEvent event) {
-        return deduplicator.dedup(event);
-    }
-
-    /**
-     * Persist AlertEntity to alert_details table
-     * @param event
-     */
-    @Override
-    public void onAlert(AlertStreamEvent event) {
-        LOG.info("write alert to eagle storage " + event);
-        event = dedup(event);
-        if(event == null) {
-            return;
-        }
-        PublishStatus status = new PublishStatus();
-        try{
-            boolean result = persist.doPersist(Arrays.asList(event));
-            if(result) {
-                status.successful = true;
-                status.errorMessage = "";
-            }else{
-                status.successful = false;
-                status.errorMessage = "";
-            }
-        }catch (Exception ex ){
-            status.successful = false;
-            status.errorMessage = ex.getMessage();
-            LOG.error("Fail writing alert entity to Eagle Store", ex);
-        }
-        this.status = status;
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public int hashCode(){
-        return new 
HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
-    }
-
-    @Override
-    public boolean equals(Object o){
-        if(o == this)
-            return true;
-        if(!(o instanceof AlertEagleStorePublisher))
-            return false;
-        return true;
-    }
-}
\ No newline at end of file


Reply via email to