Repository: eagle Updated Branches: refs/heads/master 7e41ed0b9 -> 906692058
[EAGLE-947] Publishers with same policy but different schema could produce duplicate alerts Assume that we have policy1 which have 2 kinds of output streams, one is stream1 and another is stream2. If publisher1 is configured for policy1 and stream1, and publisher2 is configured for policy1 and stream2, current code will produce 2 alerts for either stream1 or stream2. Author: Xiancheng Li <[email protected]> Closes #864 from garrettlish/master. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/90669205 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/90669205 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/90669205 Branch: refs/heads/master Commit: 9066920586174c2ba967fbd2434a21fa5b8353fa Parents: 7e41ed0 Author: Xiancheng Li <[email protected]> Authored: Tue Mar 14 15:22:03 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Tue Mar 14 15:22:03 2017 +0800 ---------------------------------------------------------------------- .../impl/AlertBoltOutputCollectorWrapper.java | 44 +++---- .../AlertBoltOutputCollectorWrapperTest.java | 120 +++++++++++++++++++ .../alert/engine/router/CustomizedHandler.java | 7 +- .../alert/engine/router/TestAlertBolt.java | 4 +- .../engine/statecheck/TestStateCheckPolicy.java | 4 +- 5 files changed, 151 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java index cffb706..606ddce 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapper.java @@ -16,19 +16,17 @@ */ package org.apache.eagle.alert.engine.evaluator.impl; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - import org.apache.eagle.alert.engine.AlertStreamCollector; import org.apache.eagle.alert.engine.StreamContext; import org.apache.eagle.alert.engine.coordinator.PublishPartition; +import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.router.StreamOutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; + public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { @@ -53,24 +51,26 @@ public class AlertBoltOutputCollectorWrapper implements AlertStreamCollector { public void emit(AlertStreamEvent event) { Set<PublishPartition> clonedPublishPartitions = new HashSet<>(publishPartitions); for (PublishPartition publishPartition : clonedPublishPartitions) { - // skip the publish partition which is not belong to this policy + // skip the publish partition which is not belong to this policy and also check streamId PublishPartition cloned = publishPartition.clone(); - if (!cloned.getPolicyId().equalsIgnoreCase(event.getPolicyId())) { - continue; - } - for (String column : cloned.getColumns()) { - int columnIndex = event.getSchema().getColumnIndex(column); - if (columnIndex < 0) { - LOG.warn("Column {} is not found in stream {}", column, cloned.getStreamId()); - continue; - } - cloned.getColumnValues().add(event.getData()[columnIndex]); - } - - synchronized (outputLock) { - streamContext.counter().incr("alert_count"); - delegate.emit(Arrays.asList(cloned, event)); - } + Optional.ofNullable(event) + .filter(x -> x != null + && x.getSchema() != null + && cloned.getPolicyId().equalsIgnoreCase(x.getPolicyId()) + && (cloned.getStreamId().equalsIgnoreCase(x.getSchema().getStreamId()) + || cloned.getStreamId().equalsIgnoreCase(Publishment.STREAM_NAME_DEFAULT))) + .ifPresent(x -> { + cloned.getColumns().stream() + .filter(y -> event.getSchema().getColumnIndex(y) >= 0 + && event.getSchema().getColumnIndex(y) < event.getSchema().getColumns().size()) + .map(y -> event.getData()[event.getSchema().getColumnIndex(y)]) + .filter(y -> y != null) + .forEach(y -> cloned.getColumnValues().add(y)); + synchronized (outputLock) { + streamContext.counter().incr("alert_count"); + delegate.emit(Arrays.asList(cloned, event)); + } + }); } } http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java new file mode 100644 index 0000000..9febed5 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/evaluator/impl/AlertBoltOutputCollectorWrapperTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.evaluator.impl; + +import org.apache.eagle.alert.engine.StreamContext; +import org.apache.eagle.alert.engine.StreamCounter; +import org.apache.eagle.alert.engine.coordinator.PublishPartition; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.router.StreamOutputCollector; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; + +import static org.mockito.Mockito.*; + +public class AlertBoltOutputCollectorWrapperTest { + + private AlertBoltOutputCollectorWrapper alertBoltOutputCollectorWrapper; + + // mock objects + private StreamOutputCollector outputCollector; + private Object outputLock; + private StreamContext streamContext; + private StreamCounter streamCounter; + + private Set<PublishPartition> publishPartitions = new HashSet<>(); + + private static final String samplePublishId = "samplePublishId"; + private static final String samplePublishId2 = "samplePublishId2"; + private static final String samplePolicyId = "samplePolicyId"; + private static final String sampleStreamId = "sampleStreamId"; + private static final String sampleStreamId2 = "sampleStreamId2"; + + @Before + public void setUp() throws Exception { + outputCollector = mock(StreamOutputCollector.class); + outputLock = mock(Object.class); + streamContext = mock(StreamContext.class); + streamCounter = mock(StreamCounter.class); + alertBoltOutputCollectorWrapper = new AlertBoltOutputCollectorWrapper(outputCollector, outputLock, streamContext); + } + + @Before + public void tearDown() throws Exception { + alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(new HashSet<>(), publishPartitions, new HashSet<>()); + publishPartitions.clear(); + } + + @Test + public void testNormal() throws Exception { + doReturn(streamCounter).when(streamContext).counter(); + + publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId)); + publishPartitions.add(createPublishPartition(samplePublishId2, samplePolicyId, sampleStreamId2)); + alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(), new HashSet<>()); + + AlertStreamEvent event = new AlertStreamEvent(); + event.setPolicyId(samplePolicyId); + StreamDefinition sd = new StreamDefinition(); + sd.setStreamId(sampleStreamId); + sd.setColumns(new ArrayList<>()); + event.setSchema(sd); + + alertBoltOutputCollectorWrapper.emit(event); + + verify(streamCounter, times(1)).incr(anyString()); + verify(outputCollector, times(1)).emit(anyObject()); + } + + @Test + public void testExceptional() throws Exception { + doReturn(streamCounter).when(streamContext).counter(); + + publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId)); + publishPartitions.add(createPublishPartition(samplePublishId, samplePolicyId, sampleStreamId)); + alertBoltOutputCollectorWrapper.onAlertBoltSpecChange(publishPartitions, new HashSet<>(), new HashSet<>()); + + AlertStreamEvent event = new AlertStreamEvent(); + event.setPolicyId(samplePolicyId); + StreamDefinition sd = new StreamDefinition(); + sd.setStreamId(sampleStreamId); + sd.setColumns(new ArrayList<>()); + event.setSchema(sd); + + alertBoltOutputCollectorWrapper.emit(event); + + verify(streamCounter, times(1)).incr(anyString()); + verify(outputCollector, times(1)).emit(anyObject()); + } + + private PublishPartition createPublishPartition(String publishId, String policyId, String streamId) { + PublishPartition publishPartition = new PublishPartition(); + publishPartition.setPolicyId(policyId); + publishPartition.setStreamId(streamId); + publishPartition.setPublishId(publishId); + publishPartition.setColumns(new HashSet<>()); + return publishPartition; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java index 4d124e1..284abc4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/CustomizedHandler.java @@ -31,8 +31,10 @@ import java.util.Map; public class CustomizedHandler implements PolicyStreamHandler { private Collector<AlertStreamEvent> collector; private PolicyHandlerContext context; + private Map<String, StreamDefinition> sds; public CustomizedHandler(Map<String, StreamDefinition> sds) { + this.sds = sds; } @Override @@ -43,8 +45,9 @@ public class CustomizedHandler implements PolicyStreamHandler { @Override public void send(StreamEvent event) throws Exception { - AlertStreamEvent alert = new AlertStreamEvent(); - alert.setPolicyId(context.getPolicyDefinition().getName()); + AlertStreamEvent alert = new AlertStreamEvent(); + alert.setPolicyId(context.getPolicyDefinition().getName()); + alert.setSchema(sds.get(event.getStreamId())); this.collector.emit(alert); } http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java index 440f555..c9e09fd 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java @@ -364,7 +364,7 @@ public class TestAlertBolt { GeneralTopologyContext context = mock(GeneralTopologyContext.class); int taskId = 1; when(context.getComponentId(taskId)).thenReturn("comp1"); - when(context.getComponentOutputFields("comp1", "default")).thenReturn(new Fields("f0")); + when(context.getComponentOutputFields("comp1", TEST_STREAM)).thenReturn(new Fields("f0")); // case 1: bolt prepared but metadata not initialized (no bolt.onAlertBoltSpecChange) PartitionedEvent pe = new PartitionedEvent(); pe.setPartitionKey(1); @@ -377,7 +377,7 @@ public class TestAlertBolt { PartitionedEventSerializerImpl peSer = new PartitionedEventSerializerImpl(bolt); byte[] serializedEvent = peSer.serialize(pe); - return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, "default"); + return new TupleImpl(context, Collections.singletonList(serializedEvent), taskId, TEST_STREAM); } private StreamPartition createPartition() { http://git-wip-us.apache.org/repos/asf/eagle/blob/90669205/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java index f4d8303..d4fe01a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/statecheck/TestStateCheckPolicy.java @@ -58,7 +58,7 @@ public class TestStateCheckPolicy { @Override public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { verified.set(true); - Assert.assertEquals("perfmon_latency_stream", ((PublishPartition) tuple.get(0)).getStreamId()); + Assert.assertEquals("perfmon_latency_check_output2", ((PublishPartition) tuple.get(0)).getStreamId()); AlertStreamEvent event = (AlertStreamEvent) tuple.get(1); System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple)); return null; @@ -92,7 +92,7 @@ public class TestStateCheckPolicy { List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"), new TypeReference<List<StreamDefinition>>() { }); - spec.addPublishPartition(streams.get(0).getStreamId(), policies.get(0).getName(), "testPublishBolt", null); + spec.addPublishPartition("perfmon_latency_check_output2", policies.get(0).getName(), "testPublishBolt", null); alertBolt.onAlertBoltSpecChange(spec, definitionMap);
