http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java deleted file mode 100644 index 30d2179..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.executor; - -import com.codahale.metrics.MetricRegistry; -import com.sun.jersey.client.impl.CopyOnWriteHashMap; -import com.typesafe.config.Config; -import org.apache.commons.lang3.time.DateUtils; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.dataproc.core.ValuesArray; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.datastream.JavaStormStreamExecutor2; -import org.apache.eagle.metric.reportor.EagleCounterMetric; -import org.apache.eagle.metric.reportor.EagleMetricListener; -import org.apache.eagle.metric.reportor.EagleServiceReporterMetricListener; -import org.apache.eagle.metric.reportor.MetricKeyCodeDecoder; -import org.apache.eagle.policy.*; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.config.AbstractPolicyDefinition; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAOImpl; -import org.apache.eagle.policy.dao.PolicyDefinitionDAO; -import org.apache.eagle.policy.siddhi.StreamMetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Tuple2; - -import java.lang.management.ManagementFactory; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -/** - * The stream process executor based on two types - * @since Dec 17, 2015 - * - * @param <T> - The policy definition entity type - * @param <K> - The stream entity type - */ -public abstract class PolicyProcessExecutor<T extends AbstractPolicyDefinitionEntity, K> - extends JavaStormStreamExecutor2<String, K> - implements PolicyLifecycleMethods<T>, PolicyDistributionReportMethods, IPolicyExecutor<T, K> -{ - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(PolicyProcessExecutor.class); - - public static final String EAGLE_EVENT_COUNT = "eagle.event.count"; - public static final String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count"; - public static final String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count"; - public static final String EAGLE_ALERT_COUNT = "eagle.alert.count"; - public static final String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count"; - - private static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE; - - private final Class<T> policyDefinitionClz; - private String executorId; - private volatile CopyOnWriteHashMap<String, PolicyEvaluator<T>> policyEvaluators; - private PolicyPartitioner partitioner; - private int numPartitions; - private int partitionSeq; - private Config config; - private Map<String, Map<String, T>> initialAlertDefs; - private String[] sourceStreams; - - /** - * metricMap's key = metricName[#policyId] - */ - private Map<String, Map<String, String>> dimensionsMap; // cache it for performance - private Map<String, String> baseDimensions; - - private MetricRegistry registry; - private EagleMetricListener listener; - - private PolicyDefinitionDAO<T> policyDefinitionDao; - - public PolicyProcessExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq, - PolicyDefinitionDAO<T> alertDefinitionDao, String[] sourceStreams, Class<T> clz){ - this.executorId = alertExecutorId; - this.partitioner = partitioner; - this.numPartitions = numPartitions; - this.partitionSeq = partitionSeq; - this.policyDefinitionDao = alertDefinitionDao; - this.sourceStreams = sourceStreams; - this.policyDefinitionClz = clz; - } - - public String getExecutorId(){ - return this.executorId; - } - - public int getNumPartitions() { - return this.numPartitions; - } - - public int getPartitionSeq(){ - return this.partitionSeq; - } - - public PolicyPartitioner getPolicyPartitioner() { - return this.partitioner; - } - - public Map<String, Map<String, T>> getInitialAlertDefs() { - return this.initialAlertDefs; - } - - public PolicyDefinitionDAO<T> getPolicyDefinitionDao() { - return policyDefinitionDao; - } - - public Map<String, PolicyEvaluator<T>> getPolicyEvaluators(){ - return policyEvaluators; - } - - @Override - public void prepareConfig(Config config) { - this.config = config; - } - - private void initMetricReportor() { - String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST); - int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT); - - String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ? - config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null; - String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ? - config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null; - - registry = new MetricRegistry(); - listener = new EagleServiceReporterMetricListener(host, port, username, password); - - baseDimensions = new HashMap<>(); - baseDimensions = new HashMap<String, String>(); - baseDimensions.put(Constants.ALERT_EXECUTOR_ID, executorId); - baseDimensions.put(Constants.PARTITIONSEQ, String.valueOf(partitionSeq)); - baseDimensions.put(Constants.SOURCE, ManagementFactory.getRuntimeMXBean().getName()); - baseDimensions.put(EagleConfigConstants.APPLICATION, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION)); - baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE)); - - dimensionsMap = new HashMap<String, Map<String, String>>(); - } - - public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){ - return new AlertStreamSchemaDAOImpl(config); - } - - @Override - public void init() { - // initialize StreamMetadataManager before it is used - StreamMetadataManager.getInstance().init(config, getAlertStreamSchemaDAO(config)); - // for each AlertDefinition, to create a PolicyEvaluator - Map<String, PolicyEvaluator<T>> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator<T>>(); - - String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE); - String application = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION); - try { - initialAlertDefs = policyDefinitionDao.findActivePoliciesGroupbyExecutorId(site, application); - } - catch (Exception ex) { - LOG.error("fail to initialize initialAlertDefs: ", ex); - throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex); - } - if(initialAlertDefs == null || initialAlertDefs.isEmpty()){ - LOG.warn("No alert definitions was found for site: " + site + ", application: " + application); - } - else if (initialAlertDefs.get(executorId) != null) { - for(T alertDef : initialAlertDefs.get(executorId).values()){ - int part = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID)); - if (part == partitionSeq) { - tmpPolicyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), createPolicyEvaluator(alertDef)); - } - } - } - - policyEvaluators = new CopyOnWriteHashMap<>(); - // for efficiency, we don't put single policy evaluator - policyEvaluators.putAll(tmpPolicyEvaluators); - DynamicPolicyLoader<T> policyLoader = DynamicPolicyLoader.getInstanceOf(policyDefinitionClz); - policyLoader.init(initialAlertDefs, policyDefinitionDao, config); - String fullQualifiedAlertExecutorId = executorId + "_" + partitionSeq; - policyLoader.addPolicyChangeListener(fullQualifiedAlertExecutorId, this); - policyLoader.addPolicyDistributionReporter(fullQualifiedAlertExecutorId, this); - LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions); - LOG.info("All policy evaluators: " + policyEvaluators); - - initMetricReportor(); - } - - /** - * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class - * - * @param alertDef alert definition - * @return PolicyEvaluator instance - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - protected PolicyEvaluator<T> createPolicyEvaluator(T alertDef){ - String policyType = alertDef.getTags().get(Constants.POLICY_TYPE); - Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType); - if(evalCls == null){ - String msg = "No policy evaluator defined for policy type : " + policyType; - LOG.error(msg); - throw new IllegalStateException(msg); - } - - // check out whether strong incoming data validation is necessary - String needValidationConfigKey= Constants.ALERT_EXECUTOR_CONFIGS + "." + executorId + ".needValidation"; - - // Default: true - boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey); - - AbstractPolicyDefinition policyDef = null; - PolicyEvaluator<T> pe; - try { - policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, - PolicyManager.getInstance().getPolicyModules(policyType)); - - PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>(); - context.policyId = alertDef.getTags().get("policyId"); - context.alertExecutor = this; - context.resultRender = this.getResultRender(); - // create evaluator instance - pe = (PolicyEvaluator<T>) evalCls - .getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class) - .newInstance(config, context, policyDef, sourceStreams, needValidation); - if (pe.isMarkdownEnabled()) // updating markdown details only if the policy is found invalid - updateMarkdownDetails(alertDef, pe.isMarkdownEnabled(), pe.getMarkdownReason()); - } catch(Exception ex) { - LOG.error("Fail creating new policyEvaluator", ex); - LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef()); - throw new IllegalStateException(ex); - } - return pe; - } - - /** - * verify both alertExecutor logic name and partition id - * @param alertDef alert definition - * - * @return whether accept the alert definition - */ - private boolean accept(T alertDef){ - String executorID = alertDef.getTags().containsKey("executorId") ? alertDef.getTags().get("executorId") - : alertDef.getTags().get("alertExecutorId"); - - if(!executorID.equals(executorId)) { - if(LOG.isDebugEnabled()){ - LOG.debug("alertDef does not belong to this alertExecutorId : " + executorId + ", alertDef : " + alertDef); - } - return false; - } - int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID)); - if(targetPartitionSeq == partitionSeq) - return true; - return false; - } - - private void updateCounter(String name, Map<String, String> dimensions, double value) { - long current = System.currentTimeMillis(); - String metricName = MetricKeyCodeDecoder.codeMetricKey(name, dimensions); - if (registry.getMetrics().get(metricName) == null) { - EagleCounterMetric metric = new EagleCounterMetric(current, metricName, value, MERITE_GRANULARITY); - metric.registerListener(listener); - registry.register(metricName, metric); - } else { - EagleCounterMetric metric = (EagleCounterMetric) registry.getMetrics().get(metricName); - metric.update(value, current); - // TODO: need remove unused metric from registry - } - } - - private void updateCounter(String name, Map<String, String> dimensions) { - updateCounter(name, dimensions, 1.0); - } - - protected Map<String, String> getDimensions(String policyId) { - if (dimensionsMap.get(policyId) == null) { - Map<String, String> newDimensions = new HashMap<String, String>(baseDimensions); - newDimensions.put(Constants.POLICY_ID, policyId); - dimensionsMap.put(policyId, newDimensions); - } - return dimensionsMap.get(policyId); - } - - /** - * within this single executor, execute all PolicyEvaluator sequentially - * the contract for input: - * 1. total # of fields for input is 3, which is fixed - * 2. the first field is key - * 3. the second field is stream name - * 4. the third field is value which is java SortedMap - */ - @Override - public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, K>> outputCollector){ - if(input.size() != 3) - throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)"); - if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2)); - if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString()); - - updateCounter(EAGLE_EVENT_COUNT, baseDimensions); - try{ - synchronized(this.policyEvaluators) { - for(Entry<String, PolicyEvaluator<T>> entry : policyEvaluators.entrySet()){ - String policyId = entry.getKey(); - PolicyEvaluator<T> evaluator = entry.getValue(); - if (!evaluator.isMarkdownEnabled()) { // not evaluated for a marked down policy - updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId)); - try { - evaluator.evaluate(new ValuesArray(outputCollector, input.get(1), input.get(2))); - } catch (Exception ex) { - LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex); - updateCounter(EAGLE_POLICY_EVAL_COUNT, getDimensions(policyId)); - } - } - } - } - } catch(Exception ex){ - LOG.error(executorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex); - updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions); - } - } - - @Override - public void onPolicyCreated(Map<String, T> added) { - if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators); - for(T alertDef : added.values()){ - if(!accept(alertDef)) - continue; - LOG.info(executorId + ", partition " + partitionSeq + " policy really added " + alertDef); - PolicyEvaluator<T> newEvaluator = createPolicyEvaluator(alertDef); - if(newEvaluator != null){ - synchronized(this.policyEvaluators) { - policyEvaluators.put(alertDef.getTags().get(Constants.POLICY_ID), newEvaluator); - } - } - } - } - - @Override - public void onPolicyChanged(Map<String, T> changed) { - if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy changed : " + changed); - for(T alertDef : changed.values()){ - if(!accept(alertDef)) - continue; - LOG.info(executorId + ", partition " + partitionSeq + " policy really changed " + alertDef); - synchronized(this.policyEvaluators) { - PolicyEvaluator<T> pe = policyEvaluators.get(alertDef.getTags().get(Constants.POLICY_ID)); - boolean previousMarkdown = pe.isMarkdownEnabled(); - String previousMarkdownReason = pe.getMarkdownReason(); - pe.onPolicyUpdate(alertDef); - if (isMarkdownUpdateRequired(previousMarkdown, pe.isMarkdownEnabled(), previousMarkdownReason, pe.getMarkdownReason())) - updateMarkdownDetails(alertDef, pe.isMarkdownEnabled(), pe.getMarkdownReason()); - } - } - } - - @Override - public void onPolicyDeleted(Map<String, T> deleted) { - if(LOG.isDebugEnabled()) LOG.debug(executorId + ", partition " + partitionSeq + " policy deleted : " + deleted); - for(T alertDef : deleted.values()){ - if(!accept(alertDef)) - continue; - LOG.info(executorId + ", partition " + partitionSeq + " policy really deleted " + alertDef); - String policyId = alertDef.getTags().get(Constants.POLICY_ID); - synchronized(this.policyEvaluators) { - if (policyEvaluators.containsKey(policyId)) { - PolicyEvaluator<T> pe = policyEvaluators.remove(alertDef.getTags().get(Constants.POLICY_ID)); - pe.onPolicyDelete(); - } - } - } - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts) { - if(alerts != null && !alerts.isEmpty()){ - String policyId = context.policyId; - LOG.info(String.format("Detected %d alerts for policy %s", alerts.size(), policyId)); - Collector outputCollector = context.outputCollector; - PolicyEvaluator<T> evaluator = context.evaluator; - updateCounter(EAGLE_ALERT_COUNT, getDimensions(policyId), alerts.size()); - for (K entity : alerts) { - synchronized(this) { - outputCollector.collect(new Tuple2(policyId, entity)); - } - if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: " + executorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity + ", for policy " + evaluator); - } - } - } - - public abstract ResultRender<T, K> getResultRender(); - - @Override - public void report() { - PolicyDistroStatsLogReporter appender = new PolicyDistroStatsLogReporter(); - appender.reportPolicyMembership(executorId + "_" + partitionSeq, policyEvaluators.keySet()); - } - - /** - * Method to check if updating markdown details in DB is required. - * @return boolean: If markdown details needs to be updated for the policy - */ - private boolean isMarkdownUpdateRequired (boolean previousMarkdown, boolean presentMarkdown, String previousReason, String presentReason) { - boolean isUpdateRequired = true; - if (!previousMarkdown && !presentMarkdown) { // not updating when previous/present policies are both valid - isUpdateRequired = false; - } else if (previousMarkdown && presentMarkdown) { - if (previousReason.equals(presentReason)) - isUpdateRequired = false; // not updating when there is no change with the markdown reason - } - return isUpdateRequired; - } - - /** - * Method to invoke Eagle Service call to update the markdown details for the policy. - */ - private void updateMarkdownDetails(T entity, boolean markdownEnabled, String markdownReason) { - AlertDefinitionAPIEntity alertEntity = (AlertDefinitionAPIEntity) entity; - alertEntity.setMarkdownEnabled(markdownEnabled); - alertEntity.setMarkdownReason(null != markdownReason ? markdownReason : ""); - policyDefinitionDao.updatePolicyDetails(entity); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java deleted file mode 100644 index 22506aa..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiOutputStreamCallback.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.stream.output.StreamCallback; - -import java.util.LinkedList; -import java.util.List; - -/** - * Siddhi stream call back - * - * Created on 1/20/16. - */ -public class SiddhiOutputStreamCallback<T extends AbstractPolicyDefinitionEntity, K> extends StreamCallback { - - public static final Logger LOG = LoggerFactory.getLogger(SiddhiOutputStreamCallback.class); - - private SiddhiPolicyEvaluator<T, K> evaluator; - public Config config; - - public SiddhiOutputStreamCallback(Config config, SiddhiPolicyEvaluator<T, K> evaluator) { - this.config = config; - this.evaluator = evaluator; - } - - @Override - public void receive(Event[] events) { - long timeStamp = System.currentTimeMillis(); - List<K> alerts = new LinkedList<>(); - PolicyEvaluationContext<T, K> siddhiContext = null; - - for (Event event : events) { - Object[] data = event.getData(); - List<Object> returns = SiddhiQueryCallbackImpl.getOutputObject(event.getData()); - K alert = siddhiContext.resultRender.render(config, returns, siddhiContext, timeStamp); - alerts.add(alert); - - if (siddhiContext == null) { - siddhiContext = (PolicyEvaluationContext<T, K>) data[0]; - } - } - - if (siddhiContext != null) { - siddhiContext.alertExecutor.onEvalEvents(siddhiContext, alerts); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java deleted file mode 100644 index 639c15b..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyDefinition.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.eagle.policy.config.AbstractPolicyDefinition; - -/** - * siddhi policy definition has the following format - * { - "type":"SiddhiCEPEngine", - "expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; " - } - */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true) -@JsonIgnoreProperties(ignoreUnknown = true) -public class SiddhiPolicyDefinition extends AbstractPolicyDefinition { - private String expression; - - private boolean containsDefinition; - - public boolean isContainsDefinition() { - return containsDefinition; - } - - public void setContainsDefinition(boolean containsDefinition) { - this.containsDefinition = containsDefinition; - } - - public String getExpression() { - return expression; - } - public void setExpression(String expression) { - this.expression = expression; - } - - @Override - public String toString(){ - return expression; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java deleted file mode 100644 index cbee286..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import com.lmax.disruptor.ExceptionHandler; -import com.lmax.disruptor.FatalExceptionHandler; -import com.typesafe.config.Config; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.apache.eagle.dataproc.core.JsonSerDeserUtils; -import org.apache.eagle.dataproc.core.ValuesArray; -import org.apache.eagle.datastream.Collector; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.apache.eagle.policy.PolicyEvaluator; -import org.apache.eagle.policy.PolicyManager; -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.policy.config.AbstractPolicyDefinition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.ExecutionPlanRuntime; -import org.wso2.siddhi.core.SiddhiManager; -import org.wso2.siddhi.core.query.output.callback.QueryCallback; -import org.wso2.siddhi.core.stream.input.InputHandler; -import org.wso2.siddhi.query.api.execution.query.Query; -import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute; -import org.wso2.siddhi.query.compiler.exception.SiddhiParserException; - -import java.io.Serializable; -import java.lang.reflect.Field; -import java.util.*; - -/** - * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources. - * during this time, synchronization is important - */ -public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T> { - - private final static String EXECUTION_PLAN_NAME = "query"; - private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class); - - private volatile SiddhiRuntime siddhiRuntime; - private final String[] sourceStreams; - private final boolean needValidation; - private final Config config; - private final PolicyEvaluationContext<T, K> context; - - /** - * everything dependent on policyDef should be together and switched in runtime - */ - public static class SiddhiRuntime { - QueryCallback queryCallback; - Map<String, InputHandler> siddhiInputHandlers; - SiddhiManager siddhiManager; - SiddhiPolicyDefinition policyDef; - List<String> outputFields; - String executionPlanName; - boolean markdownEnabled; - String markdownReason; - } - - public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> context, AbstractPolicyDefinition policyDef, String[] sourceStreams) { - this(config, context, policyDef, sourceStreams, false); - } - - public SiddhiPolicyEvaluator(Config config, PolicyEvaluationContext<T, K> context, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation) { - this.config = config; - this.context = context; - this.context.evaluator = this; - this.needValidation = needValidation; - this.sourceStreams = sourceStreams; - init(policyDef); - } - - public void init(AbstractPolicyDefinition policyDef) { - siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) policyDef); - } - - public static String addContextFieldIfNotExist(String expression) { - // select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB - int pos = expression.indexOf("select ") + 7; - int index = pos; - boolean isSelectStarPattern = true; - while (index < expression.length()) { - if (expression.charAt(index) == ' ') index++; - else if (expression.charAt(index) == '*') break; - else { - isSelectStarPattern = false; - break; - } - } - if (isSelectStarPattern) return expression; - StringBuilder sb = new StringBuilder(); - sb.append(expression.substring(0, pos)); - sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ","); - sb.append(expression.substring(pos, expression.length())); - return sb.toString(); - } - - private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef) { - SiddhiManager siddhiManager = new SiddhiManager(); - Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>(); - SiddhiRuntime runtime = new SiddhiRuntime(); - - // compose execution plan sql - String executionPlan = policyDef.getExpression(); - if (!policyDef.isContainsDefinition()) { - StringBuilder sb = new StringBuilder(); - for (String sourceStream : sourceStreams) { - String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream); - LOG.info("Siddhi stream definition : " + streamDef); - sb.append(streamDef); - } - - String expression = policyDef.getExpression(); - executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + expression; - } - - ExecutionPlanRuntime executionPlanRuntime = null; - - try { - executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan); - executionPlanRuntime.handleExceptionWith(new SiddhiPolicyExceptionHandler()); - - for (String sourceStream : sourceStreams) { - siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream)); - } - - executionPlanRuntime.start(); - LOG.info("Siddhi query: " + executionPlan); - attachCallback(runtime, executionPlanRuntime, context); - - runtime.markdownEnabled = false; - runtime.markdownReason = null; - } catch (SiddhiParserException exception) { // process is not interrupted in case of an invalid policy defined by marking down - LOG.error("Exception in parsing Siddhi query: " + executionPlan + ", reason being: " + exception.getMessage()); - runtime.queryCallback = null; - runtime.outputFields = null; - runtime.markdownEnabled = true; - runtime.markdownReason = exception.getMessage(); - } - - runtime.siddhiInputHandlers = siddhiInputHandlers; - runtime.siddhiManager = siddhiManager; - runtime.policyDef = policyDef; - runtime.executionPlanName = (null != executionPlanRuntime) ? executionPlanRuntime.getName() : null; // executionPlanRuntime will be set to null in case of an invalid policy - return runtime; - } - - private void attachCallback(SiddhiRuntime runtime, ExecutionPlanRuntime executionPlanRuntime, PolicyEvaluationContext<T, K> context) { - List<String> outputFields = new ArrayList<>(); -// String outputStreamName = config.getString("alertExecutorConfigs." + executorId + "." + "outputStream"); -// if (StringUtils.isNotEmpty(outputStreamName)) { -// StreamCallback streamCallback = new SiddhiOutputStreamCallback<>(config, this); -// executionPlanRuntime.addCallback(outputStreamName, streamCallback); -// runtime.outputStreamCallback = streamCallback; -// // find output attribute from stream call back -// try { -// Field field = StreamCallback.class.getDeclaredField("streamDefinition"); -// field.setAccessible(true); -// AbstractDefinition outStreamDef = (AbstractDefinition) field.get(streamCallback); -// outputFields = Arrays.asList(outStreamDef.getAttributeNameArray()); -// } catch (Exception ex) { -// LOG.error("Got an Exception when initial outputFields ", ex); -// } -// } else { - QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, context); - executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback); - runtime.queryCallback = callback; - // find output attribute from query call back - try { - Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME); - field.setAccessible(true); - Query query = (Query) field.get(callback); - List<OutputAttribute> list = query.getSelector().getSelectionList(); - for (OutputAttribute output : list) { - outputFields.add(output.getRename()); - } - } catch (Exception ex) { - LOG.error("Got an Exception when initial outputFields ", ex); - } -// } - runtime.outputFields = outputFields; - } - - /** - * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value - * 2. runtime check for input data (This is very expensive, so we ignore for now) - * the size of input map should be equal to size of attributes which stream metadata defines - * the attribute names should be equal to attribute names which stream metadata defines - * the input field cannot be null - */ - @SuppressWarnings({"rawtypes"}) - @Override - public void evaluate(ValuesArray data) throws Exception { - if (!siddhiRuntime.markdownEnabled) { - if (LOG.isDebugEnabled()) { - LOG.debug("Siddhi policy evaluator consumers data :" + data); - } - Collector outputCollector = (Collector) data.get(0); - String streamName = (String) data.get(1); - SortedMap dataMap = (SortedMap) data.get(2); - - // Get metadata keyset for the stream. - Set<String> metadataKeys = StreamMetadataManager.getInstance() - .getMetadataEntityMapForStream(streamName).keySet(); - - validateEventInRuntime(streamName, dataMap, metadataKeys); - - synchronized (siddhiRuntime) { - // retain the collector in the context. This assignment is idempotent - context.outputCollector = outputCollector; - - List<Object> input = new ArrayList<Object>(); - putAttrsIntoInputStream(input, streamName, metadataKeys, dataMap); - siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0])); - } - } - } - - /** - * This is a heavy operation, we should avoid to use. - * <p/> - * This validation method will skip invalid fields in event which are not declared in stream schema otherwise it will cause exception for siddhi engine. - * - * @param sourceStream source steam id - * @param data input event - * @see <a href="https://issues.apache.org/jira/browse/EAGLE-49">https://issues.apache.org/jira/browse/EAGLE-49</a> - */ - private void validateEventInRuntime(String sourceStream, SortedMap data, Set<String> metadataKeys) { - if (!needValidation) { - return; - } - - if (!metadataKeys.equals(data.keySet())) { - Set<Object> badKeys = new TreeSet<>(); - for (Object key : data.keySet()) { - if (!metadataKeys.contains(key)) { - badKeys.add(key); - } - } - LOG.warn(String.format("Ignore invalid fields %s in event: %s from stream: %s, valid fields are: %s", - badKeys.toString(), data.toString(), sourceStream, metadataKeys.toString())); - - for (Object key : badKeys) { - data.remove(key); - } - } - } - - private void putAttrsIntoInputStream(List<Object> input, String streamName, Set<String> metadataKeys, SortedMap dataMap) { - if (!needValidation) { - input.addAll(dataMap.values()); - return; - } - - // If a metadata field is not set, we put null for the field's value. - for (String key : metadataKeys) { - Object value = dataMap.get(key); - if (value == null) { - input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, key)); - } else { - input.add(value); - } - } - } - - @Override - public void onPolicyUpdate(T newAlertDef) { - AbstractPolicyDefinition policyDef = null; - try { - policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), - AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE))); - } catch (Exception ex) { - LOG.error("Initial policy def error, ", ex); - } - SiddhiRuntime previous = siddhiRuntime; - siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition) policyDef); - synchronized (previous) { - if (!previous.markdownEnabled) // condition to check if previous SiddhiRuntime was started after policy validation - previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown(); - } - } - - @Override - public void onPolicyDelete() { - synchronized (siddhiRuntime) { - LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName); - if (!siddhiRuntime.markdownEnabled) // condition to check if previous SiddhiRuntime was started after policy validation - siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown(); - LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown "); - } - } - - @Override - public String toString() { - return siddhiRuntime.policyDef.toString(); - } - - public String[] getStreamNames() { - return sourceStreams; - } - - public Map<String, String> getAdditionalContext() { - Map<String, String> context = new HashMap<String, String>(); - StringBuilder sourceStreams = new StringBuilder(); - for (String streamName : getStreamNames()) { - sourceStreams.append(streamName + ","); - } - if (sourceStreams.length() > 0) { - sourceStreams.deleteCharAt(sourceStreams.length() - 1); - } - context.put(Constants.SOURCE_STREAMS, sourceStreams.toString()); - context.put(Constants.POLICY_ID, this.context.policyId); - return context; - } - - public List<String> getOutputStreamAttrNameList() { - return siddhiRuntime.outputFields; - } - - @Override - public boolean isMarkdownEnabled() { return siddhiRuntime.markdownEnabled; } - - @Override - public String getMarkdownReason() { return siddhiRuntime.markdownReason; } - - private static class SiddhiPolicyExceptionHandler implements Serializable, ExceptionHandler<Object> { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyExceptionHandler.class); - - public void handleEventException(Throwable ex, long sequence, Object event) { - LOG.warn("Exception processing event: " + sequence + " " + event, ex); - } - - public void handleOnStartException(Throwable ex) { - LOG.warn("Exception during onStart()", ex); - } - - public void handleOnShutdownException(Throwable ex) { - LOG.warn("Exception during onShutdown()", ex); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java deleted file mode 100644 index 1bd5830..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import java.util.Arrays; -import java.util.List; - -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.policy.PolicyEvaluatorServiceProvider; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; - -public class SiddhiPolicyEvaluatorServiceProviderImpl<T extends AbstractPolicyDefinitionEntity> implements PolicyEvaluatorServiceProvider<T> { - @Override - public String getPolicyType() { - return Constants.policyType.siddhiCEPEngine.name(); - } - - @Override - public Class getPolicyEvaluator() { - return SiddhiPolicyEvaluator.class; - } - - @Override - public List<Module> getBindingModules() { - Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType())); - return Arrays.asList(module1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java deleted file mode 100644 index 43422f8..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiQueryCallbackImpl.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import com.typesafe.config.Config; -import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity; -import org.apache.eagle.policy.PolicyEvaluationContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wso2.siddhi.core.event.Event; -import org.wso2.siddhi.core.query.output.callback.QueryCallback; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Siddhi call back implementation - * - * @param <T> - The policy definition type - * @param <K> - K the alert entity type - */ -public class SiddhiQueryCallbackImpl<T extends AbstractPolicyDefinitionEntity, K> extends QueryCallback{ - - private static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class); - - private final Config config; - private final PolicyEvaluationContext<T, K> siddhiEvaluateContext; - - public SiddhiQueryCallbackImpl(Config config, PolicyEvaluationContext<T, K> siddhiContext) { - this.config = config; - this.siddhiEvaluateContext = siddhiContext; - } - - public static List<String> convertToString(List<Object> data) { - List<String> rets = new ArrayList<String>(); - for (Object object : data) { - String value = null; - if (object instanceof Double) { - value = String.valueOf((Double)object); - } - else if (object instanceof Integer) { - value = String.valueOf((Integer)object); - } - else if (object instanceof Long) { - value = String.valueOf((Long)object); - } - else if (object instanceof String) { - value = (String)object; - } - else if (object instanceof Boolean) { - value = String.valueOf((Boolean)object); - } - rets.add(value); - } - return rets; - } - - public static List<Object> getOutputObject(Object[] data) { - List<Object> rets = new ArrayList<>(data.length); -// boolean isFirst = true; - for (Object object : data) { -// // The first field is siddhiAlertContext, skip it -// if (isFirst) { -// isFirst = false; -// continue; -// } - rets.add(object); - } - return rets; - } - - @SuppressWarnings("unchecked") - @Override - public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) { - List<Object> rets = getOutputObject(inEvents[0].getData()); - K alert = siddhiEvaluateContext.resultRender.render(config, rets, siddhiEvaluateContext, timeStamp); - SiddhiEvaluationHandler<T, K> handler = siddhiEvaluateContext.alertExecutor; - handler.onEvalEvents(siddhiEvaluateContext, Arrays.asList(alert)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java deleted file mode 100644 index e4c3481..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.SortedMap; - -/** - * convert metadata entities for a stream to stream definition for siddhi cep engine - * define stream HeapUsage (metric string, host string, value double, timestamp long) - */ -public class SiddhiStreamMetadataUtils { - private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class); - - public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext"; - - public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) { - SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName); - if(map == null || map.size() == 0){ - throw new IllegalStateException("Alert stream schema ["+streamName+"] should never be empty"); - } - return map; - } - - /** - * @see org.wso2.siddhi.query.api.definition.Attribute.Type - * make sure StreamMetadataManager.init is invoked before this method - * @param streamName - * @return - */ - public static String convertToStreamDef(String streamName){ - SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName); - StringBuilder sb = new StringBuilder(); -// sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object, "); - for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){ - appendAttributeNameType(sb, entry.getKey(), entry.getValue().getAttrType()); - } - if(sb.length() > 0){ - sb.deleteCharAt(sb.length()-1); - } - - String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");"; - return String.format(siddhiStreamDefFormat, sb.toString()); - } - - public static String convertToStreamDef(String streamName, Map<String, String> eventSchema){ - StringBuilder sb = new StringBuilder(); - sb.append("context" + " object,"); - for(Map.Entry<String, String> entry : eventSchema.entrySet()){ - appendAttributeNameType(sb, entry.getKey(), entry.getValue()); - } - if(sb.length() > 0){ - sb.deleteCharAt(sb.length()-1); - } - - String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");"; - return String.format(siddhiStreamDefFormat, sb.toString()); - } - - private static void appendAttributeNameType(StringBuilder sb, String attrName, String attrType){ - sb.append(attrName); - sb.append(" "); - if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){ - sb.append("string"); - }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){ - sb.append("int"); - }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){ - sb.append("long"); - }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){ - sb.append("bool"); - }else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){ - sb.append("float"); - }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){ - sb.append("double"); - }else{ - LOG.warn("AttrType is not recognized, ignore : " + attrType); - } - sb.append(","); - } - - public static Object getAttrDefaultValue(String streamName, String attrName){ - SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName); - AlertStreamSchemaEntity entity = map.get(attrName); - if (entity.getDefaultValue() != null) { - return entity.getDefaultValue(); - } - else { - String attrType = entity.getAttrType(); - if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) { - return "NA"; - } else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) { - return -1; - } else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) { - return true; - } else { - LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string"); - return "N/A"; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java deleted file mode 100644 index 83d30e0..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/StreamMetadataManager.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.siddhi; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.apache.eagle.policy.common.Constants; -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import com.typesafe.config.Config; -import org.apache.eagle.policy.dao.AlertStreamSchemaDAO; -import org.apache.eagle.common.config.EagleConfigConstants; -import org.apache.commons.collections.map.UnmodifiableMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * centralized memory where all stream metadata sit on, it is not mutable data - */ -public class StreamMetadataManager { - private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class); - - private static StreamMetadataManager instance = new StreamMetadataManager(); - private Map<String, List<AlertStreamSchemaEntity>> map = new HashMap<String, List<AlertStreamSchemaEntity>>(); - private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>(); - private volatile boolean initialized = false; - - private StreamMetadataManager(){ - } - - public static StreamMetadataManager getInstance(){ - return instance; - } - - private void internalInit(Config config, AlertStreamSchemaDAO dao){ - try{ - String application = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.APPLICATION); - List<AlertStreamSchemaEntity> list = dao.findAlertStreamSchemaByApplication(application); - if(list == null) - return; - for (AlertStreamSchemaEntity entity : list) { - String streamName = entity.getTags().get(Constants.STREAM_NAME); - if (map.get(streamName) == null) { - map.put(streamName, new ArrayList<AlertStreamSchemaEntity>()); - map2.put(streamName, new TreeMap<String, AlertStreamSchemaEntity>()); - } - map.get(streamName).add(entity); - map2.get(streamName).put(entity.getTags().get(Constants.ATTR_NAME), entity); - } - }catch(Exception ex){ - LOG.error("Fail building metadata manger", ex); - throw new IllegalStateException(ex); - } - } - - /** - * singleton with init would be good for unit test as well, and it ensures that - * initialization happens only once before you use it. - * @param config - * @param dao - */ - public void init(Config config, AlertStreamSchemaDAO dao){ - if(!initialized){ - synchronized(this){ - if(!initialized){ - if(LOG.isDebugEnabled()) LOG.debug("Initializing ..."); - internalInit(config, dao); - initialized = true; - LOG.info("Successfully initialized"); - } - } - }else{ - LOG.info("Already initialized, skip"); - } - } - - // Only for unit test purpose - public void reset() { - synchronized (this) { - initialized = false; - map.clear(); - map2.clear(); - } - } - - private void ensureInitialized(){ - if(!initialized) - throw new IllegalStateException("StreamMetadataManager should be initialized before using it"); - } - - public List<AlertStreamSchemaEntity> getMetadataEntitiesForStream(String streamName){ - ensureInitialized(); - return getMetadataEntitiesForAllStreams().get(streamName); - } - - public Map<String, List<AlertStreamSchemaEntity>> getMetadataEntitiesForAllStreams(){ - ensureInitialized(); - return UnmodifiableMap.decorate(map); - } - - public SortedMap<String, AlertStreamSchemaEntity> getMetadataEntityMapForStream(String streamName){ - ensureInitialized(); - return getMetadataEntityMapForAllStreams().get(streamName); - } - - public Map<String, SortedMap<String, AlertStreamSchemaEntity>> getMetadataEntityMapForAllStreams(){ - ensureInitialized(); - return UnmodifiableMap.decorate(map2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider deleted file mode 100644 index eac2bfd..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java b/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java deleted file mode 100644 index c88d9bb..0000000 --- a/eagle-core/eagle-policy/eagle-policy-base/src/test/java/org/apache/eagle/policy/dao/TestSchemaDao.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.policy.dao; - -import org.apache.eagle.alert.entity.AlertStreamSchemaEntity; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.List; - -/** - * Created on 12/31/15. - */ -public class TestSchemaDao { - - @Ignore - @Test - public void test() throws Exception { - AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl("localhost", 9099, "admin", "secret"); - List<AlertStreamSchemaEntity> entities = dao.findAlertStreamSchemaByApplication("hdfsAuditLog"); - System.out.print(entities); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/pom.xml ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/pom.xml b/eagle-examples/eagle-topology-example/pom.xml deleted file mode 100644 index 5d4a419..0000000 --- a/eagle-examples/eagle-topology-example/pom.xml +++ /dev/null @@ -1,68 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <artifactId>eagle-examples</artifactId> - <groupId>org.apache.eagle</groupId> - <version>0.5.0-incubating-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>eagle-topology-example</artifactId> - <dependencies> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-stream-process-api</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.eagle</groupId> - <artifactId>eagle-query-base</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <resources> - <resource> - <directory>src/resources</directory> - </resource> - </resources> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <descriptor>src/assembly/eagle-topology-example-assembly.xml</descriptor> - <finalName>eagle-topology-example-${project.version}</finalName> - </configuration> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <tarLongFileMode>posix</tarLongFileMode> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml b/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml deleted file mode 100644 index 0acf619..0000000 --- a/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml +++ /dev/null @@ -1,63 +0,0 @@ -<?xml version="1.0"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>assembly</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>false</useProjectArtifact> - <unpack>true</unpack> - <scope>runtime</scope> - <unpackOptions> - <excludes> - <exclude>**/application.conf</exclude> - <exclude>**/defaults.yaml</exclude> - <exclude>**/storm.yaml</exclude> - <exclude>**/storm.yaml.1</exclude> - <exclude>**/log4j.properties</exclude> - </excludes> - </unpackOptions> - <excludes> - <exclude>org.apache.storm:storm-core</exclude> - <exclude>org.slf4j:slf4j-api</exclude> - <exclude>org.slf4j:log4j-over-slf4j</exclude> - <exclude>org.slf4j:slf4j-log4j12</exclude> - <exclude>log4j:log4j</exclude> - <exclude>asm:asm</exclude> - <exclude>org.apache.log4j.wso2:log4j</exclude> - </excludes> - </dependencySet> - </dependencySets> - <fileSets> - <fileSet> - <directory>${project.build.outputDirectory}</directory> - <outputDirectory>/</outputDirectory> - <excludes> - <exclude>application.conf</exclude> - <exclude>log4j.properties</exclude> - <exclude>**/storm.yaml.1</exclude> - </excludes> - </fileSet> - </fileSets> -</assembly> http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java deleted file mode 100644 index a5e39a5..0000000 --- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.eagle.example.notificationplugin; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.TreeMap; - -/** - * Created on 2/16/16. - */ -public class NotificationPluginTestMain { - public static void main(String[] args){ - System.setProperty("config.resource", "/application-plugintest.conf"); - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(); - env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).nameAs("testSpout").alertWithConsumer("testStream", "testExecutor"); - env.execute(); - } - - public static StormSpoutProvider createProvider(Config config) { - return new StormSpoutProvider(){ - - @Override - public BaseRichSpout getSpout(Config context) { - return new TestSpout(); - } - }; - } - - public static class TestSpout extends BaseRichSpout { - private static final Logger LOG = LoggerFactory.getLogger(TestSpout.class); - private SpoutOutputCollector collector; - public TestSpout() { - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - } - - @Override - public void nextTuple() { - Utils.sleep(5000); - LOG.info("emitted tuple ..."); - Map<String, Object> map = new TreeMap<>(); - map.put("testAttribute", "testValue"); - collector.emit(new Values("testStream", map)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java deleted file mode 100644 index 58dfe48..0000000 --- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.example.persist; - -import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer; - -/** - * Created on 1/4/16. - */ -public class MetricSerializer implements SpoutKafkaMessageDeserializer { - @Override - public Object deserialize(byte[] arg0) { - String logLine = new String(arg0); - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java deleted file mode 100644 index 8f105fc..0000000 --- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.example.persist; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; -import com.typesafe.config.Config; -import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StorageType; -import org.apache.eagle.datastream.core.StreamProducer; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.partition.PartitionStrategy; - -import java.util.Arrays; -import java.util.Map; -import java.util.Random; - -/** - * Created on 1/4/16. - * - * This test demonstrates how user could use the new aggregate and persist feature for case like metrics processing&storage. - * - */ -public class PersistTopoTestMain { - - public static void main(String[] args) { -// System.setProperty("config.resource", "application.conf"); - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(); - StormSpoutProvider provider = createProvider(env.getConfig()); - execWithDefaultPartition(env, provider); - } - - @SuppressWarnings("unchecked") - public static void execWithDefaultPartition(StormExecutionEnvironment env, StormSpoutProvider provider) { - StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer"); - StreamProducer filter = source; - - // required : persistTestEventStream schema be created in metadata manager - // required : policy for aggregateExecutor1 be created in metadata manager - StreamProducer aggregate = filter.aggregate(Arrays.asList("persistTestEventStream"), "aggregateExecutor1", new PartitionStrategy() { - @Override - public int balance(String key, int buckNum) { - return 0; - } - }); - - StreamProducer persist = aggregate.persist("persistExecutor1", StorageType.KAFKA()); - - env.execute(); - } - - public static StormSpoutProvider createProvider(Config config) { - - return new StormSpoutProvider(){ - - @Override - public BaseRichSpout getSpout(Config context) { - return new StaticMetricSpout(); - } - }; - } - - public static class StaticMetricSpout extends BaseRichSpout { - - private long base; - private SpoutOutputCollector collector; - - public StaticMetricSpout() { - base = System.currentTimeMillis(); - } - - private Random cpuRandom = new Random(); - private Random memRandom = new Random(); - private static final long FULL_MEM_SIZE_BYTES = 512 * 1024 * 1024 * 1024;// 16g memory upbound limit - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("timestamp", "host", "cpu", "mem")); - } - - @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { - this.collector = collector; - } - - @Override - public void nextTuple() { - Utils.sleep(100); - base = base + 100;// with fix steps.. - long mem = Double.valueOf(memRandom.nextGaussian() * FULL_MEM_SIZE_BYTES).longValue(); - collector.emit(new Values(base, "host", cpuRandom.nextInt(100), mem)); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java deleted file mode 100644 index d9a7bbb..0000000 --- a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.example.persist; - -import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider; -import org.apache.eagle.datastream.ExecutionEnvironments; -import org.apache.eagle.datastream.core.StorageType; -import org.apache.eagle.datastream.core.StreamProducer; -import org.apache.eagle.datastream.storm.StormExecutionEnvironment; -import org.apache.eagle.partition.PartitionStrategy; - -import java.util.Arrays; - -/** - * Created on 1/10/16. - */ -public class PersistTopoTestMain2 { - - public static void main(String[] args) { - System.setProperty("config.resource", "application.conf");// customize the application.conf - StormExecutionEnvironment env = ExecutionEnvironments.getStorm(); - StormSpoutProvider provider = PersistTopoTestMain.createProvider(env.getConfig()); - exec(env, provider); - } - - private static void exec(StormExecutionEnvironment env, StormSpoutProvider provider) { - StreamProducer source = env.fromSpout(provider).withOutputFields(4).nameAs("kafkaMsgConsumer"); - StreamProducer filter = source; - - // "timestamp", "host", "cpu", "mem" - String cql = " define stream eagleQuery(eagleAlertContext object, timestamp long, host string, cpu int, mem long);" - + " @info(name='query')" - + " from eagleQuery#window.externalTime(timestamp, 10 min) " - + " select eagleAlertContext, min(timestamp) as starttime, avg(cpu) as avgCpu, avg(mem) as avgMem insert into tmp;"; - StreamProducer aggregate = filter.aggregate(Arrays.asList("ealgeQuery"), cql, new PartitionStrategy() { - @Override - public int balance(String key, int buckNum) { - return 0; - } - }); - - StreamProducer persist = aggregate.persist("persistExecutor1", StorageType.KAFKA()); - - env.execute(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh ---------------------------------------------------------------------- diff --git a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh deleted file mode 100644 index e9288fa..0000000 --- a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with` -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -CUR_DIR=$(dirname $0) -source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh - -##### delete email notification ########## -echo "" -echo "Importing policy ... " -curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \ - "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \ - -d ' - [ - { - "prefix": "alertdef", - "tags": { - "site": "sandbox", - "dataSource": "testSpout", - "policyId": "pluginTestPolicy", - "alertExecutorId": "testExecutor", - "policyType": "siddhiCEPEngine" - }, - "description": "pluginTest", - "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}", - "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"subject\":\"just for test\",\"sender\":\"[email protected]\",\"recipients\":\"[email protected]\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]", - "remediationDef":"", - "enabled":true - } - ] - ' - -## Finished -echo "" -echo "Finished initialization for NotificationPluginTest" - -exit 0
