Repository: incubator-eagle
Updated Branches:
  refs/heads/master f833e9831 -> 9f4e7633d


[EAGLE-824] Multiple policies in one alert bolt produces duplicated tuples

Multiple policies in one alert bolt will cause each policy in this bolt produce 
the tuple and emit tuple into publisher, the publisher will got multiple 
duplicated tuples.

Author: Xiancheng Li <xiancheng...@ebay.com>

Closes #714 from garrettlish/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9f4e7633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9f4e7633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9f4e7633

Branch: refs/heads/master
Commit: 9f4e7633d939580ce640d7488ec1fdeb07c00c3e
Parents: f833e98
Author: Xiancheng Li <xiancheng...@ebay.com>
Authored: Tue Dec 6 12:58:59 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Tue Dec 6 12:58:59 2016 +0800

----------------------------------------------------------------------
 .../evaluator/impl/AlertBoltOutputCollectorWrapper.java  |  4 ++++
 .../org/apache/eagle/alert/engine/runner/AlertBolt.java  | 11 ++++++-----
 .../eagle/alert/engine/router/CustomizedHandler.java     |  6 +++++-
 .../org/apache/eagle/app/stream/CEPFunctionTest.java     |  2 +-
 4 files changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
index 3053e6e..cffb706 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java
@@ -53,7 +53,11 @@ public class AlertBoltOutputCollectorWrapper implements 
AlertStreamCollector {
     public void emit(AlertStreamEvent event) {
         Set<PublishPartition> clonedPublishPartitions = new 
HashSet<>(publishPartitions);
         for (PublishPartition publishPartition : clonedPublishPartitions) {
+            // skip the publish partition which is not belong to this policy
             PublishPartition cloned = publishPartition.clone();
+            if (!cloned.getPolicyId().equalsIgnoreCase(event.getPolicyId())) {
+                continue;
+            }
             for (String column : cloned.getColumns()) {
                 int columnIndex = event.getSchema().getColumnIndex(column);
                 if (columnIndex < 0) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index 02bc47e..7d66f47 100755
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -198,16 +198,16 @@ public class AlertBolt extends AbstractStreamBolt 
implements AlertBoltSpecListen
         policyGroupEvaluator.onPolicyChange(comparator.getAdded(), 
comparator.getRemoved(), comparator.getModified(), sds);
 
         // update alert output collector
-        Set<PublishPartition> tempPublishPartitions = new HashSet<>();
+        Set<PublishPartition> newPublishPartitions = new HashSet<>();
         spec.getPublishPartitions().forEach(p -> {
             if (newPolicies.stream().filter(o -> 
o.getName().equals(p.getPolicyId())).count() > 0) {
-                tempPublishPartitions.add(p);
+                newPublishPartitions.add(p);
             }
         });
 
-        Collection<PublishPartition> addedPublishPartitions = 
CollectionUtils.subtract(tempPublishPartitions, cachedPublishPartitions);
-        Collection<PublishPartition> removedPublishPartitions = 
CollectionUtils.subtract(cachedPublishPartitions, tempPublishPartitions);
-        Collection<PublishPartition> modifiedPublishPartitions = 
CollectionUtils.intersection(tempPublishPartitions, cachedPublishPartitions);
+        Collection<PublishPartition> addedPublishPartitions = 
CollectionUtils.subtract(newPublishPartitions, cachedPublishPartitions);
+        Collection<PublishPartition> removedPublishPartitions = 
CollectionUtils.subtract(cachedPublishPartitions, newPublishPartitions);
+        Collection<PublishPartition> modifiedPublishPartitions = 
CollectionUtils.intersection(newPublishPartitions, cachedPublishPartitions);
 
         LOG.debug("added PublishPartition " + addedPublishPartitions);
         LOG.debug("removed PublishPartition " + removedPublishPartitions);
@@ -217,6 +217,7 @@ public class AlertBolt extends AbstractStreamBolt 
implements AlertBoltSpecListen
 
         // switch
         cachedPolicies = newPoliciesMap;
+        cachedPublishPartitions = newPublishPartitions;
         sdf = sds;
         specVersion = spec.getVersion();
         this.spec = spec;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
index be69ffb..4d124e1 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java
@@ -30,6 +30,7 @@ import java.util.Map;
  */
 public class CustomizedHandler implements PolicyStreamHandler {
     private Collector<AlertStreamEvent> collector;
+    private PolicyHandlerContext context;
 
     public CustomizedHandler(Map<String, StreamDefinition> sds) {
     }
@@ -37,11 +38,14 @@ public class CustomizedHandler implements 
PolicyStreamHandler {
     @Override
     public void prepare(Collector<AlertStreamEvent> collector, 
PolicyHandlerContext context) throws Exception {
         this.collector = collector;
+        this.context = context;
     }
 
     @Override
     public void send(StreamEvent event) throws Exception {
-        this.collector.emit(new AlertStreamEvent());
+       AlertStreamEvent alert = new AlertStreamEvent();
+       alert.setPolicyId(context.getPolicyDefinition().getName());
+        this.collector.emit(alert);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
index a8613df..039c087 100644
--- 
a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
+++ 
b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java
@@ -54,7 +54,7 @@ public class CEPFunctionTest {
             put("name","cpu.usage");
             put("value", 0.96);
         }});
-        Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(5, 
TimeUnit.SECONDS));
+        Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(15, 
TimeUnit.SECONDS));
         function.close();
     }
 }

Reply via email to