Repository: incubator-eagle Updated Branches: refs/heads/master f833e9831 -> 9f4e7633d
[EAGLE-824] Multiple policies in one alert bolt produces duplicated tuples Multiple policies in one alert bolt will cause each policy in this bolt produce the tuple and emit tuple into publisher, the publisher will got multiple duplicated tuples. Author: Xiancheng Li <xiancheng...@ebay.com> Closes #714 from garrettlish/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/9f4e7633 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/9f4e7633 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/9f4e7633 Branch: refs/heads/master Commit: 9f4e7633d939580ce640d7488ec1fdeb07c00c3e Parents: f833e98 Author: Xiancheng Li <xiancheng...@ebay.com> Authored: Tue Dec 6 12:58:59 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Tue Dec 6 12:58:59 2016 +0800 ---------------------------------------------------------------------- .../evaluator/impl/AlertBoltOutputCollectorWrapper.java | 4 ++++ .../org/apache/eagle/alert/engine/runner/AlertBolt.java | 11 ++++++----- .../eagle/alert/engine/router/CustomizedHandler.java | 6 +++++- .../org/apache/eagle/app/stream/CEPFunctionTest.java | 2 +- 4 files changed, 16 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index 3053e6e..cffb706 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -53,7 +53,11 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { public void emit(AlertStreamEvent event) { Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); for (PublishPartition publishPartition : clonedPublishPartitions) { + // skip the publish partition which is not belong to this policy PublishPartition cloned = publishPartition.clone(); + if (!cloned.getPolicyId().equalsIgnoreCase(event.getPolicyId())) { + continue; + } for (String column : cloned.getColumns()) { int columnIndex = event.getSchema().getColumnIndex(column); if (columnIndex < 0) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index 02bc47e..7d66f47 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -198,16 +198,16 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds); // update alert output collector - Set<PublishPartition> tempPublishPartitions = new HashSet<>(); + Set<PublishPartition> newPublishPartitions = new HashSet<>(); spec.getPublishPartitions().forEach(p -> { if (newPolicies.stream().filter(o -> o.getName().equals(p.getPolicyId())).count() > 0) { - tempPublishPartitions.add(p); + newPublishPartitions.add(p); } }); - Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(tempPublishPartitions, cachedPublishPartitions); - Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions, tempPublishPartitions); - Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(tempPublishPartitions, cachedPublishPartitions); + Collection<PublishPartition> addedPublishPartitions = CollectionUtils.subtract(newPublishPartitions, cachedPublishPartitions); + Collection<PublishPartition> removedPublishPartitions = CollectionUtils.subtract(cachedPublishPartitions, newPublishPartitions); + Collection<PublishPartition> modifiedPublishPartitions = CollectionUtils.intersection(newPublishPartitions, cachedPublishPartitions); LOG.debug("added PublishPartition " + addedPublishPartitions); LOG.debug("removed PublishPartition " + removedPublishPartitions); @@ -217,6 +217,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen // switch cachedPolicies = newPoliciesMap; + cachedPublishPartitions = newPublishPartitions; sdf = sds; specVersion = spec.getVersion(); this.spec = spec; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java index be69ffb..4d124e1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java @@ -30,6 +30,7 @@ import java.util.Map; */ public class CustomizedHandler implements PolicyStreamHandler { private Collector<AlertStreamEvent> collector; + private PolicyHandlerContext context; public CustomizedHandler(Map<String, StreamDefinition> sds) { } @@ -37,11 +38,14 @@ public class CustomizedHandler implements PolicyStreamHandler { @Override public void prepare(Collector<AlertStreamEvent> collector, PolicyHandlerContext context) throws Exception { this.collector = collector; + this.context = context; } @Override public void send(StreamEvent event) throws Exception { - this.collector.emit(new AlertStreamEvent()); + AlertStreamEvent alert = new AlertStreamEvent(); + alert.setPolicyId(context.getPolicyDefinition().getName()); + this.collector.emit(alert); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/9f4e7633/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java index a8613df..039c087 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java +++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/stream/CEPFunctionTest.java @@ -54,7 +54,7 @@ public class CEPFunctionTest { put("name","cpu.usage"); put("value", 0.96); }}); - Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(5, TimeUnit.SECONDS)); + Assert.assertTrue("Should get result in 5 s", semaphore.tryAcquire(15, TimeUnit.SECONDS)); function.close(); } }