Repository: incubator-eagle Updated Branches: refs/heads/master 70600b260 -> dcefa2068
[EAGLE-804] make interface StreamContext more abstract - Add interface StreamCounter to abstract count https://issues.apache.org/jira/browse/EAGLE-804 Author: r7raul1984 <tangji...@yhd.com> Closes #693 from r7raul1984/EAGLE-804. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/dcefa206 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/dcefa206 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/dcefa206 Branch: refs/heads/master Commit: dcefa206835542c187059878d2ffbae50c1df1a3 Parents: 70600b2 Author: r7raul1984 <tangji...@yhd.com> Authored: Tue Nov 29 11:08:33 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Tue Nov 29 11:08:33 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/StormMultiCountMetric.java | 26 ++++++++++++++++++++ .../eagle/alert/engine/StreamContext.java | 3 +-- .../eagle/alert/engine/StreamContextImpl.java | 6 ++--- .../eagle/alert/engine/StreamCounter.java | 9 +++++++ .../engine/evaluator/PolicyHandlerContext.java | 8 +++--- .../impl/AlertBoltOutputCollectorWrapper.java | 2 +- .../evaluator/impl/AlertStreamCallback.java | 2 +- .../impl/PolicyGroupEvaluatorImpl.java | 10 ++++---- .../evaluator/impl/SiddhiPolicyHandler.java | 6 ++--- .../impl/StreamRouterBoltOutputCollector.java | 12 ++++----- .../engine/router/impl/StreamRouterImpl.java | 8 +++--- .../eagle/alert/engine/runner/AlertBolt.java | 8 +++--- .../alert/engine/runner/StreamRouterBolt.java | 4 +-- .../SiddhiCEPPolicyEventHandlerTest.java | 5 ++-- 14 files changed, 72 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java new file mode 100644 index 0000000..aa97b57 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StormMultiCountMetric.java @@ -0,0 +1,26 @@ +package org.apache.eagle.alert.engine; + +import backtype.storm.metric.api.MultiCountMetric; + +public class StormMultiCountMetric implements StreamCounter { + private MultiCountMetric countMetric; + + public StormMultiCountMetric(MultiCountMetric counter) { + this.countMetric = counter; + } + + @Override + public void incr(String scopeName) { + countMetric.scope(scopeName).incr(); + } + + @Override + public void incrBy(String scopeName, int length) { + countMetric.scope(scopeName).incrBy(length); + } + + @Override + public void scope(String scopeName) { + countMetric.scope(scopeName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java index a03932f..bafba83 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java @@ -1,6 +1,5 @@ package org.apache.eagle.alert.engine; -import backtype.storm.metric.api.MultiCountMetric; import com.typesafe.config.Config; /** @@ -20,7 +19,7 @@ import com.typesafe.config.Config; * limitations under the License. */ public interface StreamContext { - MultiCountMetric counter(); + StreamCounter counter(); Config config(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java index d02028a..e77a41b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java @@ -22,15 +22,15 @@ import com.typesafe.config.Config; public class StreamContextImpl implements StreamContext { private final Config config; - private final MultiCountMetric counter; + private final StreamCounter counter; public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) { - this.counter = counter; + this.counter = new StormMultiCountMetric(counter); this.config = config; } @Override - public MultiCountMetric counter() { + public StreamCounter counter() { return this.counter; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java new file mode 100644 index 0000000..ff96c30 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamCounter.java @@ -0,0 +1,9 @@ +package org.apache.eagle.alert.engine; + +public interface StreamCounter { + void incr(String scopeName); + + void incrBy(String scopeName, int length); + + void scope(String scopeName); +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java index 49f7eed..59d9e1f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/PolicyHandlerContext.java @@ -17,14 +17,14 @@ package org.apache.eagle.alert.engine.evaluator; +import org.apache.eagle.alert.engine.StreamCounter; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import backtype.storm.metric.api.MultiCountMetric; import com.typesafe.config.Config; public class PolicyHandlerContext { private PolicyDefinition policyDefinition; private PolicyGroupEvaluator policyEvaluator; - private MultiCountMetric policyCounter; + private StreamCounter policyCounter; private String policyEvaluatorId; private Config config; @@ -44,11 +44,11 @@ public class PolicyHandlerContext { this.policyEvaluator = policyEvaluator; } - public void setPolicyCounter(MultiCountMetric metric) { + public void setPolicyCounter(StreamCounter metric) { this.policyCounter = metric; } - public MultiCountMetric getPolicyCounter() { + public StreamCounter getPolicyCounter() { return policyCounter; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index b1be2da..af2b9f8 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -64,7 +64,7 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { } synchronized (outputLock) { - streamContext.counter().scope("alert_count").incr(); + streamContext.counter().incr("alert_count"); delegate.emit(Arrays.asList(cloned, event)); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java index ee1853c..6b6e0d5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertStreamCallback.java @@ -90,6 +90,6 @@ public class AlertStreamCallback extends StreamCallback { LOG.error(String.format("send event %s to index %d failed with exception. ", event, currentIndex), ex); } } - context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count")).incrBy(events.length); + context.getPolicyCounter().incrBy(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "alert_count"), events.length); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java index eed4b3b..9b1d76c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/PolicyGroupEvaluatorImpl.java @@ -57,7 +57,7 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { } public void nextEvent(PartitionedEvent event) { - this.context.counter().scope("receive_count").incr(); + this.context.counter().incr("receive_count"); dispatch(event); } @@ -87,19 +87,19 @@ public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator { if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) { try { handled = true; - this.context.counter().scope("eval_count").incr(); + this.context.counter().incr("eval_count"); policyStreamHandler.getValue().send(partitionedEvent.getEvent()); } catch (Exception e) { - this.context.counter().scope("fail_count").incr(); + this.context.counter().incr("fail_count"); LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e); } } } if (!handled) { - this.context.counter().scope("drop_count").incr(); + this.context.counter().incr("drop_count"); LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent); } else { - this.context.counter().scope("accept_count").incr(); + this.context.counter().incr("accept_count"); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java index c668935..72aca06 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/SiddhiPolicyHandler.java @@ -88,18 +88,18 @@ public class SiddhiPolicyHandler implements PolicyStreamHandler { } public void send(StreamEvent event) throws Exception { - context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count")).incr(); + context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "receive_count")); String streamId = event.getStreamId(); InputHandler inputHandler = executionRuntime.getInputHandler(streamId); if (inputHandler != null) { - context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count")).incr(); + context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "eval_count")); inputHandler.send(event.getTimestamp(), event.getData()); if (LOG.isDebugEnabled()) { LOG.debug("sent event to siddhi stream {} ", streamId); } } else { - context.getPolicyCounter().scope(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count")).incr(); + context.getPolicyCounter().incr(String.format("%s.%s", this.context.getPolicyDefinition().getName(), "drop_count")); LOG.warn("No input handler found for stream {}", streamId); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java index 3a53b44..77e8daa 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterBoltOutputCollector.java @@ -68,7 +68,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto public void emit(PartitionedEvent event) { try { - this.streamContext.counter().scope("send_count").incr(); + this.streamContext.counter().incr("send_count"); StreamPartition partition = event.getPartition(); List<StreamRouterSpec> routerSpecs = routeSpecMap.get(partition); if (routerSpecs == null || routerSpecs.size() <= 0) { @@ -83,7 +83,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto if (routePartitionerMap.get(partition) == null) { LOG.error("Partitioner for " + routerSpecs.get(0) + " is null"); synchronized (outputLock) { - this.streamContext.counter().scope("fail_count").incr(); + this.streamContext.counter().incr("fail_count"); this.outputCollector.fail(event.getAnchor()); } return; @@ -111,9 +111,9 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto } else { outputCollector.emit(targetStreamId, event.getAnchor(), Collections.singletonList(serializer.serialize(emittedEvent))); } - this.streamContext.counter().scope("emit_count").incr(); + this.streamContext.counter().incr("emit_count"); } catch (RuntimeException ex) { - this.streamContext.counter().scope("fail_count").incr(); + this.streamContext.counter().incr("fail_count"); LOG.error("Failed to emit to {} with {}", targetStreamId, newEvent, ex); throw ex; } @@ -124,7 +124,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto } catch (Exception ex) { LOG.error(ex.getMessage(), ex); synchronized (outputLock) { - this.streamContext.counter().scope("fail_count").incr(); + this.streamContext.counter().incr("fail_count"); this.outputCollector.fail(event.getAnchor()); } } @@ -217,7 +217,7 @@ public class StreamRouterBoltOutputCollector implements PartitionedEventCollecto @Override public void drop(PartitionedEvent event) { synchronized (outputLock) { - this.streamContext.counter().scope("drop_count").incr(); + this.streamContext.counter().incr("drop_count"); if (event.getAnchor() != null) { this.outputCollector.ack(event.getAnchor()); } else { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java index 7b2a1de..41523cc 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/router/impl/StreamRouterImpl.java @@ -72,13 +72,13 @@ public class StreamRouterImpl implements StreamRouter { * @param event StreamEvent */ public void nextEvent(PartitionedEvent event) { - this.context.counter().scope("receive_count").incr(); + this.context.counter().incr("receive_count"); if (!dispatchToSortHandler(event)) { - this.context.counter().scope("direct_count").incr(); + this.context.counter().incr("direct_count"); // Pass through directly if no need to sort outputCollector.emit(event); } - this.context.counter().scope("sort_count").incr(); + this.context.counter().incr("sort_count"); // Update stream clock time if moving forward and trigger all tick listeners streamTimeClockManager.onTimeUpdate(event.getStreamId(), event.getTimestamp()); } @@ -96,7 +96,7 @@ public class StreamRouterImpl implements StreamRouter { if (sortHandler == null) { if (event.isSortRequired()) { LOG.warn("Stream sort handler required has not been loaded so emmit directly: {}", event); - this.context.counter().scope("miss_sort_count").incr(); + this.context.counter().incr("miss_sort_count"); } return false; } else { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index 627a218..c946fee 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -89,7 +89,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen @Override public void execute(Tuple input) { - this.streamContext.counter().scope("execute_count").incr(); + this.streamContext.counter().incr("execute_count"); try { PartitionedEvent pe = deserialize(input.getValueByField(AlertConstants.FIELD_0)); if (logEventEnabled) { @@ -118,7 +118,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen LOG.warn(message); // send out metrics for meta conflict - this.streamContext.counter().scope("meta_conflict").incr(); + this.streamContext.counter().incr("meta_conflict"); ExecutorService executors = SingletonExecutor.getExecutorService(); executors.submit(() -> { @@ -144,11 +144,11 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen synchronized (outputLock) { this.collector.ack(input); } - this.streamContext.counter().scope("ack_count").incr(); + this.streamContext.counter().incr("ack_count"); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); synchronized (outputLock) { - this.streamContext.counter().scope("fail_count").incr(); + this.streamContext.counter().incr("fail_count"); this.collector.fail(input); } } finally { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java index 6c39189..7acd7e4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java @@ -78,10 +78,10 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter @Override public void execute(Tuple input) { try { - this.streamContext.counter().scope("execute_count").incr(); + this.streamContext.counter().incr("execute_count"); this.router.nextEvent(deserialize(input.getValueByField(AlertConstants.FIELD_0)).withAnchor(input)); } catch (Exception ex) { - this.streamContext.counter().scope("fail_count").incr(); + this.streamContext.counter().incr("fail_count"); LOG.error(ex.getMessage(), ex); this.collector.fail(input); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/dcefa206/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java index 476d71f..89039f5 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/SiddhiCEPPolicyEventHandlerTest.java @@ -18,6 +18,7 @@ package org.apache.eagle.alert.engine.evaluator; import backtype.storm.metric.api.MultiCountMetric; import org.apache.eagle.alert.engine.Collector; +import org.apache.eagle.alert.engine.StormMultiCountMetric; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl; @@ -59,7 +60,7 @@ public class SiddhiCEPPolicyEventHandlerTest { PolicyDefinition policyDefinition = MockSampleMetadataFactory.createSingleMetricSamplePolicy(); PolicyHandlerContext context = new PolicyHandlerContext(); context.setPolicyDefinition(policyDefinition); - context.setPolicyCounter(new MultiCountMetric()); + context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric())); context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId")); handler.prepare(collector, context); StreamEvent event = StreamEvent.builder() @@ -104,7 +105,7 @@ public class SiddhiCEPPolicyEventHandlerTest { handler = new SiddhiPolicyHandler(ssd, 0); PolicyHandlerContext context = new PolicyHandlerContext(); context.setPolicyDefinition(policyDefinition); - context.setPolicyCounter(new MultiCountMetric()); + context.setPolicyCounter(new StormMultiCountMetric(new MultiCountMetric())); context.setPolicyEvaluator(new PolicyGroupEvaluatorImpl("evalutorId")); handler.prepare(collector, context);