[
https://issues.apache.org/jira/browse/EAGLE-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15744699#comment-15744699
]
ASF GitHub Bot commented on EAGLE-837:
--------------------------------------
Github user wujinhu commented on a diff in the pull request:
https://github.com/apache/incubator-eagle/pull/732#discussion_r92129224
--- Diff:
eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java
---
@@ -443,6 +447,83 @@ public void reportError(Throwable error) {
Assert.assertTrue(recieved.get());
}
+
+ @Test
+ public void testStreamDefinitionChange() throws IOException {
+ PolicyDefinition def = new PolicyDefinition();
+ def.setName("policy-definition");
+ def.setInputStreams(Arrays.asList(TEST_STREAM));
+
+ PolicyDefinition.Definition definition = new
PolicyDefinition.Definition();
+ definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE);
+
definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler");
+ definition.setValue("PT0M,plain,1,host,host1");
+ def.setDefinition(definition);
+ def.setPartitionSpec(Arrays.asList(createPartition()));
+
+ AlertBoltSpec boltSpecs = new AlertBoltSpec();
+
+ AtomicBoolean recieved = new AtomicBoolean(false);
+ OutputCollector collector = new OutputCollector(new
IOutputCollector() {
+ @Override
+ public List<Integer> emit(String streamId, Collection<Tuple>
anchors, List<Object> tuple) {
+ recieved.set(true);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId,
Collection<Tuple> anchors, List<Object> tuple) {
+ }
+
+ @Override
+ public void ack(Tuple input) {
+ }
+
+ @Override
+ public void fail(Tuple input) {
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ }
+ });
+ AlertBolt bolt = createAlertBolt(collector);
+
+ boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(),
Arrays.asList(def));
+ boltSpecs.setVersion("spec_" + System.currentTimeMillis());
+ // stream def map
+ Map<String, StreamDefinition> sds = new HashMap();
+ StreamDefinition sdTest = new StreamDefinition();
+ sdTest.setStreamId(TEST_STREAM);
+ sds.put(sdTest.getStreamId(), sdTest);
+
+ boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition",
"testAlertPublish", null);
+
+ bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+ // how to assert
+ Tuple t = createTuple(bolt, boltSpecs.getVersion());
+
+ bolt.execute(t);
+
+ Assert.assertTrue(recieved.get());
+
+ LOG.info("Update stream");
+ sds = new HashMap();
+ sdTest = new StreamDefinition();
+ sdTest.setStreamId(TEST_STREAM);
+ sds.put(sdTest.getStreamId(), sdTest);
+ sdTest.setDescription("update the stream");
+ bolt.onAlertBoltSpecChange(boltSpecs, sds);
+
+ LOG.info("No any change");
+ sds = new HashMap();
+ sdTest = new StreamDefinition();
+ sdTest.setStreamId(TEST_STREAM);
+ sds.put(sdTest.getStreamId(), sdTest);
+ sdTest.setDescription("update the stream");
+ bolt.onAlertBoltSpecChange(boltSpecs, sds);
--- End diff --
It seems some tests have no asserts, does it ok?
> Stream definition change does not reflect in AlertBolt
> ------------------------------------------------------
>
> Key: EAGLE-837
> URL: https://issues.apache.org/jira/browse/EAGLE-837
> Project: Eagle
> Issue Type: Bug
> Reporter: Garrett Li
> Assignee: Garrett Li
>
> Stream definition change only trigger router bolt & publisher update, we
> don't update corresponding alert bolt stream definition references. It will
> cause alert bolt still use old stream definition references, it could produce
> array index out of bound exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)