Repository: incubator-eagle Updated Branches: refs/heads/master 743de7330 -> 7c6315311
EAGLE-837: Stream definition change does not reflect in AlertBolt Stream definition change only trigger router bolt & publisher update, we don't update corresponding alert bolt stream definition references. It will cause alert bolt still use old stream definition references, it could produce array index out of bound exception. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7c631531 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7c631531 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7c631531 Branch: refs/heads/master Commit: 7c6315311f2f08992773d175d80314c449c7e9b6 Parents: 743de73 Author: Xiancheng Li <xiancheng...@ebay.com> Authored: Mon Dec 12 19:17:47 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Tue Dec 13 16:10:00 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/StreamColumn.java | 32 ++++++++ .../engine/coordinator/StreamDefinition.java | 35 +++++++++ .../coordinator/StreamDefinitionTest.java | 4 +- .../eagle/alert/engine/runner/AlertBolt.java | 20 ++++- .../alert/engine/router/TestAlertBolt.java | 81 ++++++++++++++++++++ 5 files changed, 169 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java index 9705efc..ba736fe 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamColumn.java @@ -21,9 +21,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; +import java.util.Objects; + import javax.xml.bind.annotation.adapters.XmlAdapter; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.commons.lang3.builder.HashCodeBuilder; + public class StreamColumn implements Serializable { private static final long serialVersionUID = -5457861313624389106L; @@ -39,6 +43,34 @@ public class StreamColumn implements Serializable { name, type, defaultValue, required, nodataExpression); } + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(this.name) + .append(this.type) + .append(this.defaultValue) + .append(this.required) + .append(this.description) + .append(this.nodataExpression) + .build(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof StreamColumn)) { + return false; + } + return Objects.equals(this.name, ((StreamColumn) obj).name) + && Objects.equals(this.type, ((StreamColumn) obj).type) + && Objects.equals(this.defaultValue, ((StreamColumn) obj).defaultValue) + && Objects.equals(this.required, ((StreamColumn) obj).required) + && Objects.equals(this.description, ((StreamColumn) obj).description) + && Objects.equals(this.nodataExpression, ((StreamColumn) obj).nodataExpression); + } + public String getNodataExpression() { return nodataExpression; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java index 1be36f3..9512f1a 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java @@ -18,9 +18,14 @@ package org.apache.eagle.alert.engine.coordinator; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlElementWrapper; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; + import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * This is actually a data source schema. @@ -62,6 +67,36 @@ public class StreamDefinition implements Serializable { columns); } + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(this.streamId) + .append(this.description) + .append(this.validate) + .append(this.timeseries) + .append(this.dataSource) + .append(this.siteId) + .append(this.columns) + .build(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof StreamDefinition)) { + return false; + } + return Objects.equals(this.streamId, ((StreamDefinition) obj).streamId) + && Objects.equals(this.description, ((StreamDefinition) obj).description) + && Objects.equals(this.validate, ((StreamDefinition) obj).validate) + && Objects.equals(this.timeseries, ((StreamDefinition) obj).timeseries) + && Objects.equals(this.dataSource, ((StreamDefinition) obj).dataSource) + && Objects.equals(this.siteId, ((StreamDefinition) obj).siteId) + && CollectionUtils.isEqualCollection(this.columns, ((StreamDefinition) obj).columns); + } + public String getStreamId() { return streamId; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java index b5015cd..e33ef07 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionTest.java @@ -45,8 +45,8 @@ public class StreamDefinitionTest { StreamDefinition streamDefinition1 = streamDefinition.copy(); Assert.assertEquals("StreamDefinition[streamId=null, dataSource=null, description=null, validate=false, timeseries=false, columns=[StreamColumn=name[name], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[host], type=[string], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[flag], type=[bool], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[data], type=[long], defaultValue=[null], required=[false], nodataExpression=[null], StreamColumn=name[value], type=[double], defaultValue=[null], required=[false], nodataExpression=[null]]", streamDefinition1.toString()); - Assert.assertFalse(streamDefinition1.equals(streamDefinition)); + Assert.assertTrue(streamDefinition1.equals(streamDefinition)); Assert.assertFalse(streamDefinition1 == streamDefinition); - Assert.assertFalse(streamDefinition1.hashCode() == streamDefinition.hashCode()); + Assert.assertTrue(streamDefinition1.hashCode() == streamDefinition.hashCode()); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java index 7d66f47..edf1b6f 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java @@ -16,6 +16,7 @@ */ package org.apache.eagle.alert.engine.runner; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -103,7 +104,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen pe.getEvent().setMetaVersion(specVersion); } else if (streamEventVersion != null && !streamEventVersion.equals(specVersion)) { if (specVersion != null && streamEventVersion != null - && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) { + && specVersion.contains("spec_version_") && streamEventVersion.contains("spec_version_")) { // check if specVersion is older than stream_event_version // Long timestamp_of_specVersion = Long.valueOf(specVersion.split("spec_version_")[1]); // Long timestamp_of_streamEventVersion = Long.valueOf(stream_event_version.split("spec_version_")[1]); @@ -195,6 +196,23 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen MapComparator<String, PolicyDefinition> comparator = new MapComparator<>(newPoliciesMap, cachedPolicies); comparator.compare(); + MapComparator<String, StreamDefinition> streamComparator = new MapComparator<>(sds, sdf); + streamComparator.compare(); + + List<StreamDefinition> addOrUpdatedStreams = streamComparator.getAdded(); + addOrUpdatedStreams.addAll(streamComparator.getModified()); + List<PolicyDefinition> cachedPoliciesTemp = new ArrayList<>(cachedPolicies.values()); + addOrUpdatedStreams.forEach(s -> { + cachedPoliciesTemp.stream().filter(p -> p.getInputStreams().contains(s.getStreamId()) + || p.getOutputStreams().contains(s.getStreamId())).forEach( + p -> { + if (!comparator.getModified().contains(p) && !comparator.getAdded().contains(p)) { + comparator.getModified().add(p); + } + }); + ; + }); + policyGroupEvaluator.onPolicyChange(comparator.getAdded(), comparator.getRemoved(), comparator.getModified(), sds); // update alert output collector http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7c631531/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java index e6acc3e..8ae29d5 100755 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertBolt.java @@ -46,6 +46,8 @@ import org.apache.eagle.common.DateTimeUtil; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; @@ -65,6 +67,8 @@ import static org.mockito.Mockito.when; public class TestAlertBolt { public static final String TEST_STREAM = "test-stream"; + + private static final Logger LOG = LoggerFactory.getLogger(TestAlertBolt.class); /** * Following knowledge is guaranteed in @@ -443,6 +447,83 @@ public class TestAlertBolt { Assert.assertTrue(recieved.get()); } + + @Test + public void testStreamDefinitionChange() throws IOException { + PolicyDefinition def = new PolicyDefinition(); + def.setName("policy-definition"); + def.setInputStreams(Arrays.asList(TEST_STREAM)); + + PolicyDefinition.Definition definition = new PolicyDefinition.Definition(); + definition.setType(PolicyStreamHandlers.CUSTOMIZED_ENGINE); + definition.setHandlerClass("org.apache.eagle.alert.engine.router.CustomizedHandler"); + definition.setValue("PT0M,plain,1,host,host1"); + def.setDefinition(definition); + def.setPartitionSpec(Arrays.asList(createPartition())); + + AlertBoltSpec boltSpecs = new AlertBoltSpec(); + + AtomicBoolean recieved = new AtomicBoolean(false); + OutputCollector collector = new OutputCollector(new IOutputCollector() { + @Override + public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { + recieved.set(true); + return Collections.emptyList(); + } + + @Override + public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { + } + + @Override + public void ack(Tuple input) { + } + + @Override + public void fail(Tuple input) { + } + + @Override + public void reportError(Throwable error) { + } + }); + AlertBolt bolt = createAlertBolt(collector); + + boltSpecs.getBoltPoliciesMap().put(bolt.getBoltId(), Arrays.asList(def)); + boltSpecs.setVersion("spec_" + System.currentTimeMillis()); + // stream def map + Map<String, StreamDefinition> sds = new HashMap(); + StreamDefinition sdTest = new StreamDefinition(); + sdTest.setStreamId(TEST_STREAM); + sds.put(sdTest.getStreamId(), sdTest); + + boltSpecs.addPublishPartition(TEST_STREAM, "policy-definition", "testAlertPublish", null); + + bolt.onAlertBoltSpecChange(boltSpecs, sds); + + // how to assert + Tuple t = createTuple(bolt, boltSpecs.getVersion()); + + bolt.execute(t); + + Assert.assertTrue(recieved.get()); + + LOG.info("Update stream"); + sds = new HashMap(); + sdTest = new StreamDefinition(); + sdTest.setStreamId(TEST_STREAM); + sds.put(sdTest.getStreamId(), sdTest); + sdTest.setDescription("update the stream"); + bolt.onAlertBoltSpecChange(boltSpecs, sds); + + LOG.info("No any change"); + sds = new HashMap(); + sdTest = new StreamDefinition(); + sdTest.setStreamId(TEST_STREAM); + sds.put(sdTest.getStreamId(), sdTest); + sdTest.setDescription("update the stream"); + bolt.onAlertBoltSpecChange(boltSpecs, sds); + } @Test @Ignore public void testMultiStreamDefinition() throws Exception {