Repository: incubator-eagle Updated Branches: refs/heads/master 7a2255670 -> 852bac94e
EAGLE-684: AlertEngine : PolicyDefinition.Defintion should not be changed. When introduced stated policy definition in one policy, there is a logic to set the PolicyDefinition.Defintion's input and output stream while create the policy handler. This behavior change the policy definition itself, thus cause the metadata reload incorrectly close and recreate policy evaluation handler. Author: ralphsu This closes #561 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/852bac94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/852bac94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/852bac94 Branch: refs/heads/master Commit: 852bac94eba470527207398893c687c0610d80d8 Parents: 7a22556 Author: Ralph, Su <suliang...@gmail.com> Authored: Tue Oct 25 20:25:08 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Tue Oct 25 20:26:54 2016 +0800 ---------------------------------------------------------------------- .../evaluator/impl/SiddhiDefinitionAdapter.java | 14 ++++++-------- .../evaluator/impl/SiddhiPolicyStateHandler.java | 3 ++- 2 files changed, 8 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/852bac94/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java index 114c40a..a732e66 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiDefinitionAdapter.java @@ -16,11 +16,11 @@ */ package org.apache.eagle.alert.engine.evaluator.impl; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wso2.siddhi.query.api.definition.AbstractDefinition; @@ -84,14 +84,12 @@ public class SiddhiDefinitionAdapter { StringBuilder builder = new StringBuilder(); PolicyDefinition.Definition coreDefinition = policyDefinition.getDefinition(); // init if not present - if (coreDefinition.getInputStreams() == null || coreDefinition.getInputStreams().isEmpty()) { - coreDefinition.setInputStreams(policyDefinition.getInputStreams()); - } - if (coreDefinition.getOutputStreams() == null || coreDefinition.getOutputStreams().isEmpty()) { - coreDefinition.setOutputStreams(policyDefinition.getOutputStreams()); + List<String> inputStreams = coreDefinition.getInputStreams(); + if (inputStreams == null || inputStreams.isEmpty()) { + inputStreams = policyDefinition.getInputStreams(); } - for (String inputStream : coreDefinition.getInputStreams()) { + for (String inputStream : inputStreams) { builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); builder.append("\n"); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/852bac94/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java index 02b8e8c..141c819 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyStateHandler.java @@ -40,7 +40,8 @@ public class SiddhiPolicyStateHandler extends SiddhiPolicyHandler { protected String generateExecutionPlan(PolicyDefinition policyDefinition, Map<String, StreamDefinition> sds) throws StreamNotDefinedException { StringBuilder builder = new StringBuilder(); PolicyDefinition.Definition stateDefiniton = policyDefinition.getStateDefinition(); - for (String inputStream : stateDefiniton.getInputStreams()) { // the state stream follow the output stream of the policy definition + List<String> inputStreams = stateDefiniton.getInputStreams(); + for (String inputStream : inputStreams) { // the state stream follow the output stream of the policy definition builder.append(SiddhiDefinitionAdapter.buildStreamDefinition(sds.get(inputStream))); builder.append("\n"); }