http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
deleted file mode 100644
index 30d2179..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.executor;
-
-import com.codahale.metrics.MetricRegistry;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.time.DateUtils;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.metric.reportor.EagleCounterMetric;
-import org.apache.eagle.metric.reportor.EagleMetricListener;
-import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener;
-import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder;
-import org.apache.eagle.policy.*;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * The stream process executor based on two types
- * @since Dec 17, 2015
- *
- * @param <T> - The policy definition entity type
- * @param <K> - The stream entity type
- */
-public abstract class PolicyProcessExecutor<T extends 
AbstractPolicyDefinitionEntity, K>
-               extends JavaStormStreamExecutor2<String, K> 
-               implements PolicyLifecycleMethods<T>, 
PolicyDistributionReportMethods, IPolicyExecutor<T, K>
-{
-       
-       private static final long serialVersionUID = 1L;
-       private static final Logger LOG = 
LoggerFactory.getLogger(PolicyProcessExecutor.class);
-       
-       public static final String EAGLE_EVENT_COUNT = "eagle.event.count";
-       public static final String EAGLE_POLICY_EVAL_COUNT = 
"eagle.policy.eval.count";
-       public static final String EAGLE_POLICY_EVAL_FAIL_COUNT = 
"eagle.policy.eval.fail.count";
-       public static final String EAGLE_ALERT_COUNT = "eagle.alert.count";
-       public static final String EAGLE_ALERT_FAIL_COUNT = 
"eagle.alert.fail.count";
-
-       private  static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
-
-       private final Class<T> policyDefinitionClz;
-       private String executorId;
-       private volatile CopyOnWriteHashMap<String, PolicyEvaluator<T>> 
policyEvaluators;
-       private PolicyPartitioner partitioner;
-       private int numPartitions;
-       private int partitionSeq;
-       private Config config;
-       private Map<String, Map<String, T>> initialAlertDefs;
-       private String[] sourceStreams;
-
-       /**
-        * metricMap's key = metricName[#policyId]
-        */
-       private Map<String, Map<String, String>> dimensionsMap; // cache it for 
performance
-       private Map<String, String> baseDimensions;
-
-       private MetricRegistry registry;
-       private EagleMetricListener listener;
-
-       private PolicyDefinitionDAO<T> policyDefinitionDao;
-
-       public PolicyProcessExecutor(String alertExecutorId, PolicyPartitioner 
partitioner, int numPartitions, int partitionSeq,
-                         PolicyDefinitionDAO<T> alertDefinitionDao, String[] 
sourceStreams, Class<T> clz){
-               this.executorId = alertExecutorId;
-               this.partitioner = partitioner;
-               this.numPartitions = numPartitions;
-               this.partitionSeq = partitionSeq;
-               this.policyDefinitionDao = alertDefinitionDao;
-               this.sourceStreams = sourceStreams;
-               this.policyDefinitionClz = clz;
-       }
-       
-       public String getExecutorId(){
-               return this.executorId;
-       }
-       
-       public int getNumPartitions() {
-               return this.numPartitions;
-       }
-       
-       public int getPartitionSeq(){
-               return this.partitionSeq;
-       }
-       
-       public PolicyPartitioner getPolicyPartitioner() {
-               return this.partitioner;
-       }
-       
-       public Map<String, Map<String, T>> getInitialAlertDefs() {
-               return this.initialAlertDefs;
-       }
-               
-       public PolicyDefinitionDAO<T> getPolicyDefinitionDao() {
-               return policyDefinitionDao;
-       }
-
-    public Map<String, PolicyEvaluator<T>> getPolicyEvaluators(){
-        return policyEvaluators;
-    }
-       
-       @Override
-       public void prepareConfig(Config config) {
-               this.config = config;
-       }
-       
-       private void initMetricReportor() {
-               String host = config.getString(EagleConfigConstants.EAGLE_PROPS 
+ "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-               int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-               String username = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
-                                         
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : 
null;
-               String password = 
config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
-                                         
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : 
null;
-               
-               registry = new MetricRegistry();
-               listener = new EagleServiceReporterMetricListener(host, port, 
username, password);
-               
-               baseDimensions = new HashMap<>();
-               baseDimensions = new HashMap<String, String>();
-               baseDimensions.put(Constants.ALERT_EXECUTOR_ID, executorId);
-               baseDimensions.put(Constants.PARTITIONSEQ, 
String.valueOf(partitionSeq));
-               baseDimensions.put(Constants.SOURCE, 
ManagementFactory.getRuntimeMXBean().getName());
-               baseDimensions.put(EagleConfigConstants.APPLICATION, 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.APPLICATION));
-               baseDimensions.put(EagleConfigConstants.SITE, 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.SITE));
-
-               dimensionsMap = new HashMap<String, Map<String, String>>();
-       }
-
-    public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
-        return new AlertStreamSchemaDAOImpl(config);
-    }
-
-       @Override
-       public void init() {
-               // initialize StreamMetadataManager before it is used
-               StreamMetadataManager.getInstance().init(config, 
getAlertStreamSchemaDAO(config));
-               // for each AlertDefinition, to create a PolicyEvaluator
-               Map<String, PolicyEvaluator<T>> tmpPolicyEvaluators = new 
HashMap<String, PolicyEvaluator<T>>();
-               
-        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." 
+ EagleConfigConstants.SITE);
-               String application = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.APPLICATION);
-               try {
-                       initialAlertDefs = 
policyDefinitionDao.findActivePoliciesGroupbyExecutorId(site, application);
-               }
-               catch (Exception ex) {
-                       LOG.error("fail to initialize initialAlertDefs: ", ex);
-            throw new IllegalStateException("fail to initialize 
initialAlertDefs: ", ex);
-               }
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions was found for site: " + site + ", 
application: " + application);
-        }
-        else if (initialAlertDefs.get(executorId) != null) { 
-                       for(T alertDef : 
initialAlertDefs.get(executorId).values()){
-                               int part = partitioner.partition(numPartitions, 
alertDef.getTags().get(Constants.POLICY_TYPE), 
alertDef.getTags().get(Constants.POLICY_ID));
-                               if (part == partitionSeq) {
-                                       
tmpPolicyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), 
createPolicyEvaluator(alertDef));
-                               }
-                       }
-               }
-               
-               policyEvaluators = new CopyOnWriteHashMap<>();
-               // for efficiency, we don't put single policy evaluator
-               policyEvaluators.putAll(tmpPolicyEvaluators);
-               DynamicPolicyLoader<T> policyLoader = 
DynamicPolicyLoader.getInstanceOf(policyDefinitionClz);
-               policyLoader.init(initialAlertDefs, policyDefinitionDao, 
config);
-        String fullQualifiedAlertExecutorId = executorId + "_" + partitionSeq;
-               
policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this);
-        
policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this);
-               LOG.info("Alert Executor created, partitionSeq: " + 
partitionSeq + " , numPartitions: " + numPartitions);
-        LOG.info("All policy evaluators: " + policyEvaluators);
-               
-               initMetricReportor();
-       }
-
-    /**
-     * Create PolicyEvaluator instance according to policyType-mapped policy 
evaluator class
-     *
-     * @param alertDef alert definition
-     * @return PolicyEvaluator instance
-     */
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       protected PolicyEvaluator<T> createPolicyEvaluator(T alertDef){
-               String policyType = 
alertDef.getTags().get(Constants.POLICY_TYPE);
-               Class<? extends PolicyEvaluator> evalCls = 
PolicyManager.getInstance().getPolicyEvaluator(policyType);
-               if(evalCls == null){
-                       String msg = "No policy evaluator defined for policy 
type : " + policyType;
-                       LOG.error(msg);
-                       throw new IllegalStateException(msg);
-               }
-               
-               // check out whether strong incoming data validation is 
necessary
-        String needValidationConfigKey= Constants.ALERT_EXECUTOR_CONFIGS + "." 
+ executorId + ".needValidation";
-
-        // Default: true
-        boolean needValidation = !config.hasPath(needValidationConfigKey) || 
config.getBoolean(needValidationConfigKey);
-
-               AbstractPolicyDefinition policyDef = null;
-               PolicyEvaluator<T> pe;
-               try {
-                       policyDef = 
JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), 
AbstractPolicyDefinition.class,
-                                       
PolicyManager.getInstance().getPolicyModules(policyType));
-
-                       PolicyEvaluationContext<T, K> context = new 
PolicyEvaluationContext<>();
-                       context.policyId = alertDef.getTags().get("policyId");
-                       context.alertExecutor = this;
-                       context.resultRender = this.getResultRender();
-                       // create evaluator instance
-                       pe = (PolicyEvaluator<T>) evalCls
-                                       .getConstructor(Config.class, 
PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, 
boolean.class)
-                                       .newInstance(config, context, 
policyDef, sourceStreams, needValidation);
-            if (pe.isMarkdownEnabled()) // updating markdown details only if 
the policy is found invalid
-                updateMarkdownDetails(alertDef, pe.isMarkdownEnabled(), 
pe.getMarkdownReason());
-        } catch(Exception ex) {
-                       LOG.error("Fail creating new policyEvaluator", ex);
-                       LOG.warn("Broken policy definition and stop running : " 
+ alertDef.getPolicyDef());
-                       throw new IllegalStateException(ex);
-               }
-               return pe;
-       }
-
-    /**
-     * verify both alertExecutor logic name and partition id
-     * @param alertDef alert definition
-     *
-     * @return whether accept the alert definition
-     */
-       private boolean accept(T alertDef){
-               String executorID = 
alertDef.getTags().containsKey("executorId") ? 
alertDef.getTags().get("executorId")
-                               : alertDef.getTags().get("alertExecutorId");
-
-               if(!executorID.equals(executorId)) {
-            if(LOG.isDebugEnabled()){
-                LOG.debug("alertDef does not belong to this alertExecutorId : 
" + executorId + ", alertDef : " + alertDef);
-            }
-            return false;
-        }
-               int targetPartitionSeq = partitioner.partition(numPartitions, 
alertDef.getTags().get(Constants.POLICY_TYPE), 
alertDef.getTags().get(Constants.POLICY_ID));
-               if(targetPartitionSeq == partitionSeq)
-                       return true;
-               return false;
-       }
-
-       private void updateCounter(String name, Map<String, String> dimensions, 
double value) {
-               long current = System.currentTimeMillis();
-               String metricName = MetricKeyCodeDecoder.codeMetricKey(name, 
dimensions);
-               if (registry.getMetrics().get(metricName) == null) {
-                       EagleCounterMetric metric = new 
EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY);
-                       metric.registerListener(listener);
-                       registry.register(metricName, metric);
-               } else {
-                       EagleCounterMetric metric = (EagleCounterMetric) 
registry.getMetrics().get(metricName);
-                       metric.update(value, current);
-                       // TODO: need remove unused metric from registry
-               }
-       }
-       
-       private void updateCounter(String name, Map<String, String> dimensions) 
{
-               updateCounter(name, dimensions, 1.0);
-       }
-       
-       protected Map<String, String> getDimensions(String policyId) {
-               if (dimensionsMap.get(policyId) == null) {
-                       Map<String, String> newDimensions = new HashMap<String, 
String>(baseDimensions);
-                       newDimensions.put(Constants.POLICY_ID, policyId);
-                       dimensionsMap.put(policyId, newDimensions);
-               }
-               return dimensionsMap.get(policyId);
-       }
-       
-    /**
-     * within this single executor, execute all PolicyEvaluator sequentially
-     * the contract for input:
-     * 1. total # of fields for input is 3, which is fixed
-     * 2. the first field is key
-     * 3. the second field is stream name
-     * 4. the third field is value which is java SortedMap
-     */
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
K>> outputCollector){
-        if(input.size() != 3)
-            throw new IllegalStateException("AlertExecutor always consumes 
exactly 3 fields: key, stream name and value(SortedMap)");
-        if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
-        if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + 
policyEvaluators.keySet().toString());
-
-        updateCounter(EAGLE_EVENT_COUNT, baseDimensions);
-        try{
-            synchronized(this.policyEvaluators) {
-                for(Entry<String, PolicyEvaluator<T>> entry : 
policyEvaluators.entrySet()){
-                    String policyId = entry.getKey();
-                    PolicyEvaluator<T> evaluator = entry.getValue();
-                    if (!evaluator.isMarkdownEnabled()) { // not evaluated for 
a marked down policy
-                        updateCounter(EAGLE_POLICY_EVAL_COUNT, 
getDimensions(policyId));
-                        try {
-                            evaluator.evaluate(new 
ValuesArray(outputCollector, input.get(1), input.get(2)));
-                        } catch (Exception ex) {
-                            LOG.error("Got an exception, but continue to run " 
+ input.get(2).toString(), ex);
-                            updateCounter(EAGLE_POLICY_EVAL_COUNT, 
getDimensions(policyId));
-                        }
-                    }
-                }
-            }
-        } catch(Exception ex){
-            LOG.error(executorId + ", partition " + partitionSeq + ", error 
fetching alerts, but continue to run", ex);
-            updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions);
-        }
-    }
-
-       @Override
-       public void onPolicyCreated(Map<String, T> added) {
-               if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " 
+ partitionSeq + " policy added : " + added + " policyEvaluators " + 
policyEvaluators);
-               for(T alertDef : added.values()){
-                       if(!accept(alertDef))
-                               continue;
-                       LOG.info(executorId + ", partition " + partitionSeq + " 
policy really added " + alertDef);
-                       PolicyEvaluator<T> newEvaluator = 
createPolicyEvaluator(alertDef);
-                       if(newEvaluator != null){
-                               synchronized(this.policyEvaluators) {
-                                       
policyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), newEvaluator);
-                               }
-                       }
-               }
-       }
-
-    @Override
-    public void onPolicyChanged(Map<String, T> changed) {
-        if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + 
partitionSeq + " policy changed : " + changed);
-        for(T alertDef : changed.values()){
-            if(!accept(alertDef))
-                continue;
-            LOG.info(executorId + ", partition " + partitionSeq + " policy 
really changed " + alertDef);
-            synchronized(this.policyEvaluators) {
-                PolicyEvaluator<T> pe = 
policyEvaluators.get(alertDef.getTags().get(Constants.POLICY_ID));
-                boolean previousMarkdown = pe.isMarkdownEnabled();
-                String previousMarkdownReason = pe.getMarkdownReason();
-                pe.onPolicyUpdate(alertDef);
-                if (isMarkdownUpdateRequired(previousMarkdown, 
pe.isMarkdownEnabled(), previousMarkdownReason, pe.getMarkdownReason()))
-                    updateMarkdownDetails(alertDef, pe.isMarkdownEnabled(), 
pe.getMarkdownReason());
-            }
-        }
-    }
-
-       @Override
-       public void onPolicyDeleted(Map<String, T> deleted) {
-               if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " 
+ partitionSeq + " policy deleted : " + deleted);
-               for(T alertDef : deleted.values()){
-                       if(!accept(alertDef))
-                               continue;
-                       LOG.info(executorId + ", partition " + partitionSeq + " 
policy really deleted " + alertDef);
-                       String policyId = 
alertDef.getTags().get(Constants.POLICY_ID);
-                       synchronized(this.policyEvaluators) {                   
-                               if (policyEvaluators.containsKey(policyId)) {
-                                       PolicyEvaluator<T> pe = 
policyEvaluators.remove(alertDef.getTags().get(Constants.POLICY_ID));
-                                       pe.onPolicyDelete();
-                               }
-                       }
-               }
-       }
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       public void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> 
alerts) {
-               if(alerts != null && !alerts.isEmpty()){
-                       String policyId = context.policyId;
-            LOG.info(String.format("Detected %d alerts for policy %s", 
alerts.size(), policyId));
-                       Collector outputCollector = context.outputCollector;
-                       PolicyEvaluator<T> evaluator = context.evaluator;
-                       updateCounter(EAGLE_ALERT_COUNT, 
getDimensions(policyId), alerts.size());
-                       for (K entity : alerts) {
-                               synchronized(this) {
-                                       outputCollector.collect(new 
Tuple2(policyId, entity));
-                               }
-                               if(LOG.isDebugEnabled()) LOG.debug("A new alert 
is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert 
with output context: " + entity + ", for policy " + evaluator);
-                       }
-               }
-       }
-
-       public abstract ResultRender<T, K> getResultRender();
-
-    @Override
-    public void report() {
-        PolicyDistroStatsLogReporter appender = new 
PolicyDistroStatsLogReporter();
-        appender.reportPolicyMembership(executorId + "_" + partitionSeq, 
policyEvaluators.keySet());
-    }
-
-    /**
-     * Method to check if updating markdown details in DB is required.
-     * @return boolean: If markdown details needs to be updated for the policy
-     */
-    private boolean isMarkdownUpdateRequired (boolean previousMarkdown, 
boolean presentMarkdown, String previousReason, String presentReason) {
-        boolean isUpdateRequired = true;
-        if (!previousMarkdown && !presentMarkdown) { // not updating when 
previous/present policies are both valid
-            isUpdateRequired = false;
-        } else if (previousMarkdown && presentMarkdown) {
-            if (previousReason.equals(presentReason))
-                isUpdateRequired = false; // not updating when there is no 
change with the markdown reason
-        }
-        return isUpdateRequired;
-    }
-
-    /**
-     * Method to invoke Eagle Service call to update the markdown details for 
the policy.
-     */
-    private void updateMarkdownDetails(T entity, boolean markdownEnabled, 
String markdownReason) {
-        AlertDefinitionAPIEntity alertEntity = (AlertDefinitionAPIEntity) 
entity;
-        alertEntity.setMarkdownEnabled(markdownEnabled);
-        alertEntity.setMarkdownReason(null != markdownReason ? markdownReason 
: "");
-        policyDefinitionDao.updatePolicyDetails(entity);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
deleted file mode 100644
index 22506aa..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Siddhi stream call back
- *
- * Created on 1/20/16.
- */
-public class SiddhiOutputStreamCallback<T extends 
AbstractPolicyDefinitionEntity, K> extends StreamCallback {
-
-    public static final Logger LOG = 
LoggerFactory.getLogger(SiddhiOutputStreamCallback.class);
-
-    private SiddhiPolicyEvaluator<T, K> evaluator;
-    public Config config;
-
-    public SiddhiOutputStreamCallback(Config config, SiddhiPolicyEvaluator<T, 
K> evaluator) {
-        this.config = config;
-        this.evaluator = evaluator;
-    }
-
-    @Override
-    public void receive(Event[] events) {
-        long timeStamp = System.currentTimeMillis();
-        List<K> alerts = new LinkedList<>();
-        PolicyEvaluationContext<T, K> siddhiContext =  null;
-
-        for (Event event : events) {
-            Object[] data = event.getData();
-            List<Object> returns = 
SiddhiQueryCallbackImpl.getOutputObject(event.getData());
-            K alert = siddhiContext.resultRender.render(config, returns, 
siddhiContext, timeStamp);
-            alerts.add(alert);
-
-            if (siddhiContext == null) {
-                siddhiContext = (PolicyEvaluationContext<T, K>) data[0];
-            }
-        }
-
-        if (siddhiContext != null) {
-            siddhiContext.alertExecutor.onEvalEvents(siddhiContext, alerts);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
deleted file mode 100644
index 639c15b..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-
-/**
- * siddhi policy definition has the following format
- * {
-        "type":"SiddhiCEPEngine",
-               "expression" : "from every b1=HeapUsage[metric == 
'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> 
b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 
0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as 
timerange insert into GCMonitor; "
-       }
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
-       private String expression;
-
-       private boolean containsDefinition;
-
-       public boolean isContainsDefinition() {
-               return containsDefinition;
-       }
-
-       public void setContainsDefinition(boolean containsDefinition) {
-               this.containsDefinition = containsDefinition;
-       }
-
-       public String getExpression() {
-               return expression;
-       }
-       public void setExpression(String expression) {
-               this.expression = expression;
-       }
-       
-       @Override
-       public String toString(){
-               return expression;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
deleted file mode 100644
index cbee286..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.lmax.disruptor.ExceptionHandler;
-import com.lmax.disruptor.FatalExceptionHandler;
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
-import org.apache.eagle.dataproc.core.ValuesArray;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.policy.PolicyEvaluator;
-import org.apache.eagle.policy.PolicyManager;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.config.AbstractPolicyDefinition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
-
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.*;
-
-/**
- * when policy is updated or deleted, SiddhiManager.shutdown should be invoked 
to release resources.
- * during this time, synchronization is important
- */
-public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, 
K> implements PolicyEvaluator<T> {
-
-    private final static String EXECUTION_PLAN_NAME = "query";
-    private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
-
-    private volatile SiddhiRuntime siddhiRuntime;
-    private final String[] sourceStreams;
-    private final boolean needValidation;
-    private final Config config;
-    private final PolicyEvaluationContext<T, K> context;
-
-    /**
-     * everything dependent on policyDef should be together and switched in 
runtime
-     */
-    public static class SiddhiRuntime {
-        QueryCallback queryCallback;
-        Map<String, InputHandler> siddhiInputHandlers;
-        SiddhiManager siddhiManager;
-        SiddhiPolicyDefinition policyDef;
-        List<String> outputFields;
-        String executionPlanName;
-        boolean markdownEnabled;
-        String markdownReason;
-    }
-
-    public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> 
context, AbstractPolicyDefinition policyDef, String[] sourceStreams) {
-        this(config, context, policyDef, sourceStreams, false);
-    }
-
-    public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> 
context, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean 
needValidation) {
-        this.config = config;
-        this.context = context;
-        this.context.evaluator = this;
-        this.needValidation = needValidation;
-        this.sourceStreams = sourceStreams;
-        init(policyDef);
-    }
-
-    public void init(AbstractPolicyDefinition policyDef) {
-        siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) 
policyDef);
-    }
-
-    public static String addContextFieldIfNotExist(String expression) {
-        // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
-        int pos = expression.indexOf("select ") + 7;
-        int index = pos;
-        boolean isSelectStarPattern = true;
-        while (index < expression.length()) {
-            if (expression.charAt(index) == ' ') index++;
-            else if (expression.charAt(index) == '*') break;
-            else {
-                isSelectStarPattern = false;
-                break;
-            }
-        }
-        if (isSelectStarPattern) return expression;
-        StringBuilder sb = new StringBuilder();
-        sb.append(expression.substring(0, pos));
-        sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
-        sb.append(expression.substring(pos, expression.length()));
-        return sb.toString();
-    }
-
-    private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition 
policyDef) {
-        SiddhiManager siddhiManager = new SiddhiManager();
-        Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, 
InputHandler>();
-        SiddhiRuntime runtime = new SiddhiRuntime();
-
-        // compose execution plan sql
-        String executionPlan = policyDef.getExpression();
-        if (!policyDef.isContainsDefinition()) {
-            StringBuilder sb = new StringBuilder();
-            for (String sourceStream : sourceStreams) {
-                String streamDef = 
SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
-                LOG.info("Siddhi stream definition : " + streamDef);
-                sb.append(streamDef);
-            }
-
-            String expression = policyDef.getExpression();
-            executionPlan = sb.toString() + " @info(name = '" + 
EXECUTION_PLAN_NAME + "') " + expression;
-        }
-
-        ExecutionPlanRuntime executionPlanRuntime = null;
-
-        try {
-            executionPlanRuntime = 
siddhiManager.createExecutionPlanRuntime(executionPlan);
-            executionPlanRuntime.handleExceptionWith(new 
SiddhiPolicyExceptionHandler());
-
-            for (String sourceStream : sourceStreams) {
-                siddhiInputHandlers.put(sourceStream, 
executionPlanRuntime.getInputHandler(sourceStream));
-            }
-
-            executionPlanRuntime.start();
-            LOG.info("Siddhi query: " + executionPlan);
-            attachCallback(runtime, executionPlanRuntime, context);
-
-            runtime.markdownEnabled = false;
-            runtime.markdownReason = null;
-        } catch (SiddhiParserException exception) { // process is not 
interrupted in case of an invalid policy defined by marking down
-            LOG.error("Exception in parsing Siddhi query: " + executionPlan + 
", reason being: " + exception.getMessage());
-            runtime.queryCallback = null;
-            runtime.outputFields = null;
-            runtime.markdownEnabled = true;
-            runtime.markdownReason = exception.getMessage();
-        }
-
-        runtime.siddhiInputHandlers = siddhiInputHandlers;
-        runtime.siddhiManager = siddhiManager;
-        runtime.policyDef = policyDef;
-        runtime.executionPlanName = (null != executionPlanRuntime) ? 
executionPlanRuntime.getName() : null; // executionPlanRuntime will be set to 
null in case of an invalid policy
-        return runtime;
-    }
-
-    private void attachCallback(SiddhiRuntime runtime, ExecutionPlanRuntime 
executionPlanRuntime, PolicyEvaluationContext<T, K> context) {
-        List<String> outputFields = new ArrayList<>();
-//        String outputStreamName = config.getString("alertExecutorConfigs." + 
executorId + "." + "outputStream");
-//        if (StringUtils.isNotEmpty(outputStreamName)) {
-//            StreamCallback streamCallback = new 
SiddhiOutputStreamCallback<>(config, this);
-//            executionPlanRuntime.addCallback(outputStreamName, 
streamCallback);
-//            runtime.outputStreamCallback = streamCallback;
-//            // find output attribute from stream call back
-//            try {
-//                Field field = 
StreamCallback.class.getDeclaredField("streamDefinition");
-//                field.setAccessible(true);
-//                AbstractDefinition outStreamDef = (AbstractDefinition) 
field.get(streamCallback);
-//                outputFields = 
Arrays.asList(outStreamDef.getAttributeNameArray());
-//            } catch (Exception ex) {
-//                LOG.error("Got an Exception when initial outputFields ", ex);
-//            }
-//        } else {
-            QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, 
context);
-            executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
-            runtime.queryCallback = callback;
-            // find output attribute from query call back
-            try {
-                Field field = 
QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
-                field.setAccessible(true);
-                Query query = (Query) field.get(callback);
-                List<OutputAttribute> list = 
query.getSelector().getSelectionList();
-                for (OutputAttribute output : list) {
-                    outputFields.add(output.getRename());
-                }
-            } catch (Exception ex) {
-                LOG.error("Got an Exception when initial outputFields ", ex);
-            }
-//        }
-        runtime.outputFields = outputFields;
-    }
-
-    /**
-     * 1. input has 3 fields, first is siddhi context, second is streamName, 
the last one is map of attribute name/value
-     * 2. runtime check for input data (This is very expensive, so we ignore 
for now)
-     * the size of input map should be equal to size of attributes which 
stream metadata defines
-     * the attribute names should be equal to attribute names which stream 
metadata defines
-     * the input field cannot be null
-     */
-    @SuppressWarnings({"rawtypes"})
-    @Override
-    public void evaluate(ValuesArray data) throws Exception {
-        if (!siddhiRuntime.markdownEnabled) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Siddhi policy evaluator consumers data :" + data);
-            }
-            Collector outputCollector = (Collector) data.get(0);
-            String streamName = (String) data.get(1);
-            SortedMap dataMap = (SortedMap) data.get(2);
-
-            // Get metadata keyset for the stream.
-            Set<String> metadataKeys = StreamMetadataManager.getInstance()
-                    .getMetadataEntityMapForStream(streamName).keySet();
-
-            validateEventInRuntime(streamName, dataMap, metadataKeys);
-
-            synchronized (siddhiRuntime) {
-                // retain the collector in the context. This assignment is 
idempotent
-                context.outputCollector = outputCollector;
-
-                List<Object> input = new ArrayList<Object>();
-                putAttrsIntoInputStream(input, streamName, metadataKeys, 
dataMap);
-                
siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new 
Object[0]));
-            }
-        }
-    }
-
-    /**
-     * This is a heavy operation, we should avoid to use.
-     * <p/>
-     * This validation method will skip invalid fields in event which are not 
declared in stream schema otherwise it will cause exception for siddhi engine.
-     *
-     * @param sourceStream source steam id
-     * @param data         input event
-     * @see <a 
href="https://issues.apache.org/jira/browse/EAGLE-49";>https://issues.apache.org/jira/browse/EAGLE-49</a>
-     */
-    private void validateEventInRuntime(String sourceStream, SortedMap data, 
Set<String> metadataKeys) {
-        if (!needValidation) {
-            return;
-        }
-
-        if (!metadataKeys.equals(data.keySet())) {
-            Set<Object> badKeys = new TreeSet<>();
-            for (Object key : data.keySet()) {
-                if (!metadataKeys.contains(key)) {
-                    badKeys.add(key);
-                }
-            }
-            LOG.warn(String.format("Ignore invalid fields %s in event: %s from 
stream: %s, valid fields are: %s",
-                    badKeys.toString(), data.toString(), sourceStream, 
metadataKeys.toString()));
-
-            for (Object key : badKeys) {
-                data.remove(key);
-            }
-        }
-    }
-
-    private void putAttrsIntoInputStream(List<Object> input, String 
streamName, Set<String> metadataKeys, SortedMap dataMap) {
-        if (!needValidation) {
-            input.addAll(dataMap.values());
-            return;
-        }
-
-        // If a metadata field is not set, we put null for the field's value.
-        for (String key : metadataKeys) {
-            Object value = dataMap.get(key);
-            if (value == null) {
-                
input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, key));
-            } else {
-                input.add(value);
-            }
-        }
-    }
-
-    @Override
-    public void onPolicyUpdate(T newAlertDef) {
-        AbstractPolicyDefinition policyDef = null;
-        try {
-            policyDef = 
JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
-                    AbstractPolicyDefinition.class, 
PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE)));
-        } catch (Exception ex) {
-            LOG.error("Initial policy def error, ", ex);
-        }
-        SiddhiRuntime previous = siddhiRuntime;
-        siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) 
policyDef);
-        synchronized (previous) {
-            if (!previous.markdownEnabled) // condition to check if previous 
SiddhiRuntime was started after policy validation
-                
previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
-        }
-    }
-
-    @Override
-    public void onPolicyDelete() {
-        synchronized (siddhiRuntime) {
-            LOG.info("Going to shutdown siddhi execution plan, planName: " + 
siddhiRuntime.executionPlanName);
-            if (!siddhiRuntime.markdownEnabled) // condition to check if 
previous SiddhiRuntime was started after policy validation
-                
siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
-            LOG.info("Siddhi execution plan " + 
siddhiRuntime.executionPlanName + " is successfully shutdown ");
-        }
-    }
-
-    @Override
-    public String toString() {
-        return siddhiRuntime.policyDef.toString();
-    }
-
-    public String[] getStreamNames() {
-        return sourceStreams;
-    }
-
-    public Map<String, String> getAdditionalContext() {
-        Map<String, String> context = new HashMap<String, String>();
-        StringBuilder sourceStreams = new StringBuilder();
-        for (String streamName : getStreamNames()) {
-            sourceStreams.append(streamName + ",");
-        }
-        if (sourceStreams.length() > 0) {
-            sourceStreams.deleteCharAt(sourceStreams.length() - 1);
-        }
-        context.put(Constants.SOURCE_STREAMS, sourceStreams.toString());
-        context.put(Constants.POLICY_ID, this.context.policyId);
-        return context;
-    }
-
-    public List<String> getOutputStreamAttrNameList() {
-        return siddhiRuntime.outputFields;
-    }
-
-    @Override
-    public boolean isMarkdownEnabled() { return siddhiRuntime.markdownEnabled; 
}
-
-    @Override
-    public String getMarkdownReason() { return siddhiRuntime.markdownReason; }
-
-    private static class SiddhiPolicyExceptionHandler implements Serializable, 
ExceptionHandler<Object> {
-        private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiPolicyExceptionHandler.class);
-
-        public void handleEventException(Throwable ex, long sequence, Object 
event) {
-            LOG.warn("Exception processing event: " + sequence + " " + event, 
ex);
-        }
-
-        public void handleOnStartException(Throwable ex) {
-            LOG.warn("Exception during onStart()", ex);
-        }
-
-        public void handleOnShutdownException(Throwable ex) {
-            LOG.warn("Exception during onShutdown()", ex);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
deleted file mode 100644
index 1bd5830..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-public class SiddhiPolicyEvaluatorServiceProviderImpl<T extends 
AbstractPolicyDefinitionEntity> implements PolicyEvaluatorServiceProvider<T> {
-       @Override
-       public String getPolicyType() {
-               return Constants.policyType.siddhiCEPEngine.name();
-       }
-
-       @Override
-       public Class getPolicyEvaluator() {
-               return SiddhiPolicyEvaluator.class;
-       }
-
-       @Override
-       public List<Module> getBindingModules() {
-               Module module1 = new 
SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new 
NamedType(SiddhiPolicyDefinition.class, getPolicyType()));
-               return Arrays.asList(module1);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
deleted file mode 100644
index 43422f8..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Siddhi call back implementation
- *
- * @param <T> - The policy definition type
- * @param <K> - K the alert entity type
- */
-public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, 
K> extends QueryCallback{
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
-
-       private final Config config;
-       private final PolicyEvaluationContext<T, K> siddhiEvaluateContext;
-
-       public SiddhiQueryCallbackImpl(Config config, 
PolicyEvaluationContext<T, K> siddhiContext) {
-               this.config = config;
-               this.siddhiEvaluateContext = siddhiContext;
-       }
-       
-       public static List<String> convertToString(List<Object> data) {
-               List<String> rets = new ArrayList<String>();
-               for (Object object : data) {
-                       String value = null;
-                       if (object instanceof Double) {
-                               value = String.valueOf((Double)object);
-                       }
-                       else if (object instanceof Integer) {
-                               value = String.valueOf((Integer)object);
-                       }
-                       else if (object instanceof Long) {
-                               value = String.valueOf((Long)object);
-                       }
-                       else if (object instanceof String) {
-                               value = (String)object;
-                       }
-                       else if (object instanceof Boolean) {
-                               value = String.valueOf((Boolean)object);
-                       }
-                       rets.add(value);
-               }
-               return rets;
-       }
-
-       public static List<Object> getOutputObject(Object[] data) {
-               List<Object> rets = new ArrayList<>(data.length);
-//             boolean isFirst = true;
-               for (Object object : data) {
-//                     // The first field is siddhiAlertContext, skip it
-//                     if (isFirst) {
-//                             isFirst = false;
-//                             continue;
-//                     }
-                       rets.add(object);
-               }
-               return rets;
-       }
-       
-       @SuppressWarnings("unchecked")
-       @Override
-       public void receive(long timeStamp, Event[] inEvents, Event[] 
removeEvents) {
-               List<Object> rets = getOutputObject(inEvents[0].getData());
-               K alert = siddhiEvaluateContext.resultRender.render(config, 
rets, siddhiEvaluateContext, timeStamp);
-               SiddhiEvaluationHandler<T, K> handler = 
siddhiEvaluateContext.alertExecutor;
-               handler.onEvalEvents(siddhiEvaluateContext, 
Arrays.asList(alert));
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
deleted file mode 100644
index e4c3481..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.SortedMap;
-
-/**
- * convert metadata entities for a stream to stream definition for siddhi cep 
engine
- * define stream HeapUsage (metric string, host string, value double, 
timestamp long)
- */
-public class SiddhiStreamMetadataUtils {
-       private final static Logger LOG = 
LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class);
-       
-       public final static String EAGLE_ALERT_CONTEXT_FIELD = 
"eagleAlertContext";
-
-       public static SortedMap<String, AlertStreamSchemaEntity> 
getAttrMap(String streamName) {
-               SortedMap<String, AlertStreamSchemaEntity> map = 
StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
-               if(map == null || map.size() == 0){
-                       throw new IllegalStateException("Alert stream schema 
["+streamName+"] should never be empty");
-               }
-               return map;
-       }
-
-       /**
-     * @see org.wso2.siddhi.query.api.definition.Attribute.Type
-     * make sure StreamMetadataManager.init is invoked before this method
-        * @param streamName
-        * @return
-        */
-       public static String convertToStreamDef(String streamName){
-               SortedMap<String, AlertStreamSchemaEntity> map = 
getAttrMap(streamName);
-               StringBuilder sb = new StringBuilder();
-//             sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object, ");
-               for(Map.Entry<String, AlertStreamSchemaEntity> entry : 
map.entrySet()){
-            appendAttributeNameType(sb, entry.getKey(), 
entry.getValue().getAttrType());
-               }
-               if(sb.length() > 0){
-                       sb.deleteCharAt(sb.length()-1);
-               }
-               
-               String siddhiStreamDefFormat = "define stream " + streamName + 
"(" + "%s" + ");";
-               return String.format(siddhiStreamDefFormat, sb.toString());
-       }
-
-    public static String convertToStreamDef(String streamName, Map<String, 
String> eventSchema){
-        StringBuilder sb = new StringBuilder();
-        sb.append("context" + " object,");
-        for(Map.Entry<String, String> entry : eventSchema.entrySet()){
-            appendAttributeNameType(sb, entry.getKey(), entry.getValue());
-        }
-        if(sb.length() > 0){
-            sb.deleteCharAt(sb.length()-1);
-        }
-
-        String siddhiStreamDefFormat = "define stream " + streamName + "(" + 
"%s" + ");";
-        return String.format(siddhiStreamDefFormat, sb.toString());
-    }
-
-    private static void appendAttributeNameType(StringBuilder sb, String 
attrName, String attrType){
-        sb.append(attrName);
-        sb.append(" ");
-        if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
-            sb.append("string");
-        }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
-            sb.append("int");
-        }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
-            sb.append("long");
-        }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
-            sb.append("bool");
-        }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
-            sb.append("float");
-        }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
-            sb.append("double");
-        }else{
-            LOG.warn("AttrType is not recognized, ignore : " + attrType);
-        }
-        sb.append(",");
-    }
-
-       public static Object getAttrDefaultValue(String streamName, String 
attrName){
-               SortedMap<String, AlertStreamSchemaEntity> map = 
getAttrMap(streamName);
-               AlertStreamSchemaEntity entity = map.get(attrName);
-               if (entity.getDefaultValue() != null) {
-                       return entity.getDefaultValue();
-               }
-               else {
-                       String attrType = entity.getAttrType();
-                       if 
(attrType.equalsIgnoreCase(AttributeType.STRING.name())) {
-                               return "NA";
-                       } else if 
(attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || 
attrType.equalsIgnoreCase(AttributeType.LONG.name())) {
-                               return -1;
-                       } else if 
(attrType.equalsIgnoreCase(AttributeType.BOOL.name())) {
-                               return true;
-                       } else {
-                               LOG.warn("AttrType is not recognized: " + 
attrType + ", treat it as string");
-                               return "N/A";
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
deleted file mode 100644
index 83d30e0..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import com.typesafe.config.Config;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.map.UnmodifiableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * centralized memory where all stream metadata sit on, it is not mutable data
- */
-public class StreamMetadataManager {
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamMetadataManager.class);
-       
-       private static StreamMetadataManager instance = new 
StreamMetadataManager();
-       private Map<String, List<AlertStreamSchemaEntity>> map = new 
HashMap<String, List<AlertStreamSchemaEntity>>();
-       private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = 
new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>();
-       private volatile boolean initialized = false;
-       
-       private StreamMetadataManager(){
-       }
-       
-       public static StreamMetadataManager getInstance(){
-               return instance;
-       }
-       
-       private void internalInit(Config config, AlertStreamSchemaDAO dao){
-               try{
-                       String application = 
config.getString(EagleConfigConstants.EAGLE_PROPS + "." + 
EagleConfigConstants.APPLICATION);
-                       List<AlertStreamSchemaEntity> list = 
dao.findAlertStreamSchemaByApplication(application);
-                       if(list == null)
-                               return;
-                       for (AlertStreamSchemaEntity entity : list) {
-                               String streamName = 
entity.getTags().get(Constants.STREAM_NAME);
-                               if (map.get(streamName) == null) {
-                                       map.put(streamName, new 
ArrayList<AlertStreamSchemaEntity>());
-                                       map2.put(streamName, new 
TreeMap<String, AlertStreamSchemaEntity>());
-                               }
-                               map.get(streamName).add(entity);
-                               
map2.get(streamName).put(entity.getTags().get(Constants.ATTR_NAME), entity);
-                       }
-               }catch(Exception ex){
-                       LOG.error("Fail building metadata manger", ex);
-                       throw new IllegalStateException(ex);
-               }
-       }
-       
-       /**
-        * singleton with init would be good for unit test as well, and it 
ensures that
-        * initialization happens only once before you use it.  
-        * @param config
-        * @param dao
-        */
-       public void init(Config config, AlertStreamSchemaDAO dao){
-               if(!initialized){
-                       synchronized(this){
-                               if(!initialized){
-                    if(LOG.isDebugEnabled()) LOG.debug("Initializing ...");
-                                       internalInit(config, dao);
-                                       initialized = true;
-                    LOG.info("Successfully initialized");
-                               }
-                       }
-               }else{
-            LOG.info("Already initialized, skip");
-        }
-       }
-
-       // Only for unit test purpose
-       public void reset() {
-               synchronized (this) {
-                       initialized = false;
-                       map.clear();
-                       map2.clear();
-               }
-       }
-
-       private void ensureInitialized(){
-               if(!initialized)
-                       throw new IllegalStateException("StreamMetadataManager 
should be initialized before using it");
-       }
-       
-       public List<AlertStreamSchemaEntity> 
getMetadataEntitiesForStream(String streamName){
-               ensureInitialized();
-               return getMetadataEntitiesForAllStreams().get(streamName);
-       }
-       
-       public Map<String, List<AlertStreamSchemaEntity>> 
getMetadataEntitiesForAllStreams(){
-               ensureInitialized();
-               return UnmodifiableMap.decorate(map);
-       }
-       
-       public SortedMap<String, AlertStreamSchemaEntity> 
getMetadataEntityMapForStream(String streamName){
-               ensureInitialized();
-               return getMetadataEntityMapForAllStreams().get(streamName);
-       }
-       
-       public Map<String, SortedMap<String, AlertStreamSchemaEntity>> 
getMetadataEntityMapForAllStreams(){
-               ensureInitialized();
-               return UnmodifiableMap.decorate(map2);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
 
b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index eac2bfd..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
 
b/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
deleted file mode 100644
index c88d9bb..0000000
--- 
a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.dao;
-
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Created on 12/31/15.
- */
-public class TestSchemaDao {
-
-    @Ignore
-    @Test
-    public void test() throws Exception {
-        AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl("localhost", 
9099, "admin", "secret");
-        List<AlertStreamSchemaEntity> entities = 
dao.findAlertStreamSchemaByApplication("hdfsAuditLog");
-        System.out.print(entities);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/pom.xml 
b/eagle-examples/eagle-topology-example/pom.xml
deleted file mode 100644
index 5d4a419..0000000
--- a/eagle-examples/eagle-topology-example/pom.xml
+++ /dev/null
@@ -1,68 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <parent>
-        <artifactId>eagle-examples</artifactId>
-        <groupId>org.apache.eagle</groupId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>eagle-topology-example</artifactId>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-query-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <resources>
-            <resource>
-                <directory>src/resources</directory>
-            </resource>
-        </resources>
-        <plugins>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    
<descriptor>src/assembly/eagle-topology-example-assembly.xml</descriptor>
-                    
<finalName>eagle-topology-example-${project.version}</finalName>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <configuration>
-                            <tarLongFileMode>posix</tarLongFileMode>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
 
b/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
deleted file mode 100644
index 0acf619..0000000
--- 
a/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
-  <id>assembly</id>
-    <formats>
-        <format>jar</format>
-    </formats>
-    <includeBaseDirectory>false</includeBaseDirectory>
-    <dependencySets>
-        <dependencySet>
-            <outputDirectory>/</outputDirectory>
-            <useProjectArtifact>false</useProjectArtifact>
-            <unpack>true</unpack>
-            <scope>runtime</scope>
-            <unpackOptions>
-                <excludes>
-                    <exclude>**/application.conf</exclude>
-                    <exclude>**/defaults.yaml</exclude>
-                    <exclude>**/storm.yaml</exclude>
-                    <exclude>**/storm.yaml.1</exclude>
-                    <exclude>**/log4j.properties</exclude>
-                </excludes>
-            </unpackOptions>
-            <excludes>
-                <exclude>org.apache.storm:storm-core</exclude>
-                <exclude>org.slf4j:slf4j-api</exclude>
-                <exclude>org.slf4j:log4j-over-slf4j</exclude>
-                <exclude>org.slf4j:slf4j-log4j12</exclude>
-                <exclude>log4j:log4j</exclude>
-                <exclude>asm:asm</exclude>
-                <exclude>org.apache.log4j.wso2:log4j</exclude>
-            </excludes>
-        </dependencySet>
-    </dependencySets>
-    <fileSets>
-        <fileSet>
-            <directory>${project.build.outputDirectory}</directory>
-            <outputDirectory>/</outputDirectory>
-            <excludes>
-               <exclude>application.conf</exclude>
-               <exclude>log4j.properties</exclude>
-               <exclude>**/storm.yaml.1</exclude>
-            </excludes>
-        </fileSet>
-    </fileSets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
 
b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
deleted file mode 100644
index a5e39a5..0000000
--- 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.example.notificationplugin;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Created on 2/16/16.
- */
-public class NotificationPluginTestMain {
-    public static void main(String[] args){
-        System.setProperty("config.resource", "/application-plugintest.conf");
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
-        
env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).nameAs("testSpout").alertWithConsumer("testStream",
 "testExecutor");
-        env.execute();
-    }
-
-    public static StormSpoutProvider createProvider(Config config) {
-        return new StormSpoutProvider(){
-
-            @Override
-            public BaseRichSpout getSpout(Config context) {
-                return new TestSpout();
-            }
-        };
-    }
-
-    public static class TestSpout extends BaseRichSpout {
-        private static final Logger LOG = 
LoggerFactory.getLogger(TestSpout.class);
-        private SpoutOutputCollector collector;
-        public TestSpout() {
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        }
-
-        @Override
-        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(5000);
-            LOG.info("emitted tuple ...");
-            Map<String, Object> map = new TreeMap<>();
-            map.put("testAttribute", "testValue");
-            collector.emit(new Values("testStream", map));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
 
b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
deleted file mode 100644
index 58dfe48..0000000
--- 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.example.persist;
-
-import 
org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-
-/**
- * Created on 1/4/16.
- */
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
-    @Override
-    public Object deserialize(byte[] arg0) {
-        String logLine = new String(arg0);
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
 
b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
deleted file mode 100644
index 8f105fc..0000000
--- 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.example.persist;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StorageType;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.PartitionStrategy;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Created on 1/4/16.
- *
- * This test demonstrates how user could use the new aggregate and persist 
feature for case like metrics processing&storage.
- *
- */
-public class PersistTopoTestMain {
-
-    public static void main(String[] args) {
-//        System.setProperty("config.resource", "application.conf");
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
-        StormSpoutProvider provider = createProvider(env.getConfig());
-        execWithDefaultPartition(env, provider);
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void execWithDefaultPartition(StormExecutionEnvironment env, 
StormSpoutProvider provider) {
-        StreamProducer source = 
env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer");
-        StreamProducer filter = source;
-
-        // required : persistTestEventStream schema be created in metadata 
manager
-        // required : policy for aggregateExecutor1 be created in metadata 
manager
-        StreamProducer aggregate = 
filter.aggregate(Arrays.asList("persistTestEventStream"), "aggregateExecutor1", 
new PartitionStrategy() {
-            @Override
-            public int balance(String key, int buckNum) {
-                return 0;
-            }
-        });
-
-        StreamProducer persist = aggregate.persist("persistExecutor1", 
StorageType.KAFKA());
-
-        env.execute();
-    }
-
-    public static StormSpoutProvider createProvider(Config config) {
-
-        return new StormSpoutProvider(){
-
-            @Override
-            public BaseRichSpout getSpout(Config context) {
-                return new StaticMetricSpout();
-            }
-        };
-    }
-
-    public static class StaticMetricSpout extends BaseRichSpout {
-
-        private long base;
-        private SpoutOutputCollector collector;
-
-        public StaticMetricSpout() {
-            base = System.currentTimeMillis();
-        }
-
-        private Random cpuRandom = new Random();
-        private Random memRandom = new Random();
-        private static final long FULL_MEM_SIZE_BYTES = 512  * 1024 * 1024 * 
1024;// 16g memory upbound limit
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("timestamp", "host", "cpu", "mem"));
-        }
-
-        @Override
-        public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
-            this.collector = collector;
-        }
-
-        @Override
-        public void nextTuple() {
-            Utils.sleep(100);
-            base = base + 100;// with fix steps..
-            long mem = Double.valueOf(memRandom.nextGaussian() * 
FULL_MEM_SIZE_BYTES).longValue();
-            collector.emit(new Values(base, "host", cpuRandom.nextInt(100), 
mem));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
 
b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
deleted file mode 100644
index d9a7bbb..0000000
--- 
a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.example.persist;
-
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StorageType;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.PartitionStrategy;
-
-import java.util.Arrays;
-
-/**
- * Created on 1/10/16.
- */
-public class PersistTopoTestMain2 {
-
-    public static void main(String[] args) {
-        System.setProperty("config.resource", "application.conf");// customize 
the application.conf
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
-        StormSpoutProvider provider = 
PersistTopoTestMain.createProvider(env.getConfig());
-        exec(env, provider);
-    }
-
-    private static void exec(StormExecutionEnvironment env, StormSpoutProvider 
provider) {
-        StreamProducer source = 
env.fromSpout(provider).withOutputFields(4).nameAs("kafkaMsgConsumer");
-        StreamProducer filter = source;
-
-        // "timestamp", "host", "cpu", "mem"
-        String cql = " define stream eagleQuery(eagleAlertContext object, 
timestamp long, host string, cpu int, mem long);"
-                + " @info(name='query')"
-                + " from eagleQuery#window.externalTime(timestamp, 10 min) "
-                + " select eagleAlertContext, min(timestamp) as starttime, 
avg(cpu) as avgCpu, avg(mem) as avgMem insert into tmp;";
-        StreamProducer aggregate = 
filter.aggregate(Arrays.asList("ealgeQuery"), cql, new PartitionStrategy() {
-            @Override
-            public int balance(String key, int buckNum) {
-                return 0;
-            }
-        });
-
-        StreamProducer persist = aggregate.persist("persistExecutor1", 
StorageType.KAFKA());
-
-        env.execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
----------------------------------------------------------------------
diff --git 
a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
 
b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
deleted file mode 100644
index e9288fa..0000000
--- 
a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-CUR_DIR=$(dirname $0)
-source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
-
-##### delete email notification ##########
-echo ""
-echo "Importing policy ... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 
'Content-Type:application/json' \
- 
"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService";
 \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "dataSource": "testSpout",
-         "policyId": "pluginTestPolicy",
-         "alertExecutorId": "testExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "pluginTest",
-       "policyDef": "{\"expression\":\"from testStream[(testAttribute == 
\\\"testValue\\\")] select * insert into 
outputStream;\",\"type\":\"siddhiCEPEngine\"}",
-       "notificationDef": 
"[{\"notificationType\":\"eagleStore\"},{\"subject\":\"just for 
test\",\"sender\":\"[email protected]\",\"recipients\":\"[email protected]\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
-       "remediationDef":"",
-       "enabled":true
-     }
- ]
- '
-
-## Finished
-echo ""
-echo "Finished initialization for NotificationPluginTest"
-
-exit 0


Reply via email to