Repository: incubator-eagle Updated Branches: refs/heads/master eaab4a9e0 -> a1c5eca05
EAGLE-741: Make publishment settings both policy & stream awareness Currently our publishment is defined policy specific, we cannot publish the alert into different places for one policy although there are multiple output streams for this policy. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/ccc5ffb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/ccc5ffb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/ccc5ffb5 Branch: refs/heads/master Commit: ccc5ffb528f8ffd2c750ad57d5bbb3ad5bcbdab8 Parents: ca0fae4 Author: Xiancheng Li <xiancheng...@ebay.com> Authored: Mon Nov 7 13:55:20 2016 +0800 Committer: Xiancheng Li <xiancheng...@ebay.com> Committed: Mon Nov 7 16:31:07 2016 +0800 ---------------------------------------------------------------------- .../alert/engine/coordinator/Publishment.java | 13 ++- .../publisher/impl/AbstractPublishPlugin.java | 1 - .../publisher/impl/AlertPublisherImpl.java | 101 +++++++++++++------ .../engine/router/TestAlertPublisherBolt.java | 2 + .../src/test/resources/testPublishSpec.json | 6 +- .../src/test/resources/testPublishSpec2.json | 6 +- .../src/test/resources/testPublishSpec3.json | 42 ++++++++ 7 files changed, 135 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java index b224a4b..dbb1844 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java @@ -32,6 +32,7 @@ public class Publishment { private String name; private String type; private List<String> policyIds; + private List<String> streamIds; private String dedupIntervalMin; private List<String> dedupFields; private String dedupStateField; @@ -97,6 +98,14 @@ public class Publishment { this.policyIds = policyIds; } + public List<String> getStreamIds() { + return streamIds; + } + + public void setStreamIds(List<String> streamIds) { + this.streamIds = streamIds; + } + public String getDedupIntervalMin() { return dedupIntervalMin; } @@ -130,7 +139,9 @@ public class Publishment { && Objects.equals(dedupFields, p.getDedupFields()) && Objects.equals(dedupStateField, p.getDedupStateField()) && Objects.equals(overrideDeduplicator, p.getOverrideDeduplicator()) - && Objects.equals(policyIds, p.getPolicyIds()) && properties.equals(p.getProperties())); + && Objects.equals(policyIds, p.getPolicyIds()) + && Objects.equals(streamIds, p.getStreamIds()) + && properties.equals(p.getProperties())); } return false; } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index f0f048d..f68ae52 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -23,7 +23,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.codec.IEventSerializer; import org.apache.eagle.alert.engine.coordinator.OverrideDeduplicatorSpec; import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index 87ac30f..210fd1b 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * <p/> + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,8 +17,16 @@ package org.apache.eagle.alert.engine.publisher.impl; -import com.typesafe.config.Config; -import org.apache.commons.collections.ListUtils; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.Publishment; import org.apache.eagle.alert.engine.model.AlertStreamEvent; @@ -27,20 +35,19 @@ import org.apache.eagle.alert.engine.publisher.AlertPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import com.typesafe.config.Config; @SuppressWarnings("rawtypes") public class AlertPublisherImpl implements AlertPublisher { + private static final long serialVersionUID = 4809983246198138865L; private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherImpl.class); + + private static final String STREAM_NAME_DEFAULT = "default"; + private final String name; - private volatile Map<String, List<String>> policyPublishPluginMapping = new ConcurrentHashMap<>(1); + private volatile Map<String, Set<String>> psPublishPluginMapping = new ConcurrentHashMap<>(1); private volatile Map<String, AlertPublishPlugin> publishPluginMapping = new ConcurrentHashMap<>(1); private Config config; private Map conf; @@ -74,13 +81,18 @@ public class AlertPublisherImpl implements AlertPublisher { LOG.warn("policyId cannot be null for event to be published"); return; } - List<String> pubIds = policyPublishPluginMapping.get(policyId); + // use default stream name if specified stream publisher is not found + Set<String> pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId, event.getStreamId())); if (pubIds == null) { - LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId); + pubIds = psPublishPluginMapping.get(getPolicyStreamUniqueId(policyId)); + } + if (pubIds == null) { + LOG.warn("Policy {} Stream {} does *NOT* subscribe any publishment!", policyId, event.getStreamId()); return; } for (String pubId : pubIds) { + @SuppressWarnings("resource") AlertPublishPlugin plugin = pubId != null ? publishPluginMapping.get(pubId) : null; if (plugin == null) { LOG.warn("Policy {} does *NOT* subscribe any publishment!", policyId); @@ -100,12 +112,11 @@ public class AlertPublisherImpl implements AlertPublisher { publishPluginMapping.values().forEach(plugin -> plugin.close()); } - @SuppressWarnings("unchecked") @Override public synchronized void onPublishChange(List<Publishment> added, - List<Publishment> removed, - List<Publishment> afterModified, - List<Publishment> beforeModified) { + List<Publishment> removed, + List<Publishment> afterModified, + List<Publishment> beforeModified) { if (added == null) { added = new ArrayList<>(); } @@ -125,7 +136,7 @@ public class AlertPublisherImpl implements AlertPublisher { } // copy and swap to avoid concurrency issue - Map<String, List<String>> newPolicyPublishPluginMapping = new HashMap<>(policyPublishPluginMapping); + Map<String, Set<String>> newPSPublishPluginMapping = new HashMap<>(psPublishPluginMapping); Map<String, AlertPublishPlugin> newPublishMap = new HashMap<>(publishPluginMapping); // added @@ -135,7 +146,7 @@ public class AlertPublisherImpl implements AlertPublisher { AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (plugin != null) { newPublishMap.put(publishment.getName(), plugin); - addPublishmentPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), publishment.getName()); + addPublishmentPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(), publishment.getStreamIds(), publishment.getName()); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -144,7 +155,7 @@ public class AlertPublisherImpl implements AlertPublisher { List<AlertPublishPlugin> toBeClosed = new ArrayList<>(); for (Publishment publishment : removed) { String pubName = publishment.getName(); - removePublihsPolicies(newPolicyPublishPluginMapping, publishment.getPolicyIds(), pubName); + removePublihsPoliciesStreams(newPSPublishPluginMapping, publishment.getPolicyIds(), pubName); toBeClosed.add(newPublishMap.get(pubName)); newPublishMap.remove(publishment.getName()); } @@ -152,13 +163,14 @@ public class AlertPublisherImpl implements AlertPublisher { for (int i = 0; i < afterModified.size(); i++) { String pubName = afterModified.get(i).getName(); List<String> newPolicies = afterModified.get(i).getPolicyIds(); + List<String> newStreams = afterModified.get(i).getStreamIds(); List<String> oldPolicies = beforeModified.get(i).getPolicyIds(); + List<String> oldStreams = beforeModified.get(i).getStreamIds(); - if (!newPolicies.equals(oldPolicies)) { - List<String> deletedPolicies = ListUtils.subtract(oldPolicies, newPolicies); - removePublihsPolicies(newPolicyPublishPluginMapping, deletedPolicies, pubName); - List<String> addedPolicies = ListUtils.subtract(newPolicies, oldPolicies); - addPublishmentPolicies(newPolicyPublishPluginMapping, addedPolicies, pubName); + if (!newPolicies.equals(oldPolicies) || !newStreams.equals(oldStreams)) { + // since both policy & stream may change, skip the compare and difference update + removePublihsPoliciesStreams(newPSPublishPluginMapping, oldPolicies, pubName); + addPublishmentPoliciesStreams(newPSPublishPluginMapping, newPolicies, newStreams, pubName); } Publishment newPub = afterModified.get(i); newPublishMap.get(pubName).update(newPub.getDedupIntervalMin(), newPub.getProperties()); @@ -166,7 +178,7 @@ public class AlertPublisherImpl implements AlertPublisher { // now do the swap publishPluginMapping = newPublishMap; - policyPublishPluginMapping = newPolicyPublishPluginMapping; + psPublishPluginMapping = newPSPublishPluginMapping; // safely close : it depend on plugin to check if want to wait all data to be flushed. closePlugins(toBeClosed); @@ -182,26 +194,51 @@ public class AlertPublisherImpl implements AlertPublisher { } } - private void addPublishmentPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> addedPolicyIds, String pubName) { + private void addPublishmentPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping, + List<String> addedPolicyIds, List<String> addedStreamIds, String pubName) { if (addedPolicyIds == null || pubName == null) { return; } + if (addedStreamIds == null || addedStreamIds.size() <= 0) { + addedStreamIds = new ArrayList<String>(); + addedStreamIds.add(STREAM_NAME_DEFAULT); + } + for (String policyId : addedPolicyIds) { - newPolicyPublishPluginMapping.putIfAbsent(policyId, new ArrayList<>()); - newPolicyPublishPluginMapping.get(policyId).add(pubName); + for (String streamId : addedStreamIds) { + String psUniqueId = getPolicyStreamUniqueId(policyId, streamId); + newPSPublishPluginMapping.putIfAbsent(psUniqueId, new HashSet<>()); + newPSPublishPluginMapping.get(psUniqueId).add(pubName); + } } } - private synchronized void removePublihsPolicies(Map<String, List<String>> newPolicyPublishPluginMapping, List<String> deletedPolicyIds, String pubName) { + private synchronized void removePublihsPoliciesStreams(Map<String, Set<String>> newPSPublishPluginMapping, + List<String> deletedPolicyIds, String pubName) { if (deletedPolicyIds == null || pubName == null) { return; } for (String policyId : deletedPolicyIds) { - List<String> publishIds = newPolicyPublishPluginMapping.get(policyId); - publishIds.remove(pubName); + for (Entry<String, Set<String>> entry : newPSPublishPluginMapping.entrySet()) { + if (entry.getKey().startsWith("policyId:" + policyId)) { + entry.getValue().remove(pubName); + break; + } + } + } + } + + private String getPolicyStreamUniqueId(String policyId) { + return getPolicyStreamUniqueId(policyId, STREAM_NAME_DEFAULT); + } + + private String getPolicyStreamUniqueId(String policyId, String streamId) { + if (StringUtils.isBlank(streamId)) { + streamId = STREAM_NAME_DEFAULT; } + return String.format("policyId:%s,streamId:%s", policyId, streamId); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index 5cdb6f1..c95cab1 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -136,6 +136,7 @@ public class TestAlertPublisherBolt { public void testMapComparator() { PublishSpec spec1 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec.json"), PublishSpec.class); PublishSpec spec2 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec2.json"), PublishSpec.class); + PublishSpec spec3 = MetadataSerDeser.deserialize(getClass().getResourceAsStream("/testPublishSpec3.json"), PublishSpec.class); Map<String, Publishment> map1 = new HashMap<>(); Map<String, Publishment> map2 = new HashMap<>(); spec1.getPublishments().forEach(p -> map1.put(p.getName(), p)); @@ -148,6 +149,7 @@ public class TestAlertPublisherBolt { AlertPublisherBolt publisherBolt = new AlertPublisherBolt("alert-publisher-test", null, null); publisherBolt.onAlertPublishSpecChange(spec1, null); publisherBolt.onAlertPublishSpecChange(spec2, null); + publisherBolt.onAlertPublishSpecChange(spec3, null); } @Test http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json index 70ea6b3..a8f4105 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec.json @@ -11,6 +11,9 @@ "policy2", "policy3" ], + "streamIds": [ + "stream1" + ], "dedupIntervalMin": "PT1M", "properties": { "subject": "Test Alert", @@ -24,7 +27,8 @@ "mail.debug": "false", "mail.connection": "tls", "mail.smtp.port": "587" - } + }, + "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer" } /* { "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json index e14db43..e31e1f4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec2.json @@ -9,6 +9,9 @@ "policyIds": [ "policy1" ], + "streamIds": [ + "stream1" + ], "dedupIntervalMin": "PT2M", "properties": { "subject": "Test Alert", @@ -22,7 +25,8 @@ "mail.debug": "false", "mail.connection": "tls", "mail.smtp.port": "587" - } + }, + "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer" } // { // "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ccc5ffb5/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json new file mode 100644 index 0000000..0bf0e2a --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/testPublishSpec3.json @@ -0,0 +1,42 @@ +{ + "version": "version1", + "topologyName": "testTopology", + "boltId": "alertPublishBolt", + "publishments": [ + { + "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher", + "name": "email-testAlertStream", + "policyIds": [ + "policy1" + ], + "streamIds": [ + "stream2" + ], + "dedupIntervalMin": "PT2M", + "properties": { + "subject": "Test Alert", + "template": "", + "sender": "sen...@corp.com", + "recipients": "recei...@corp.com", + "mail.smtp.host": "smtp.mailhost.com", + "mail.smtp.auth": "true", + "mail.username": "username", + "mail.password": "password", + "mail.debug": "false", + "mail.connection": "tls", + "mail.smtp.port": "587" + }, + "serializer": "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer" + } + // { + // "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher", + // "name":"kafka-testAlertStream", + // "policyIds": ["testPolicy"], + // "dedupIntervalMin": "PT1M", + // "properties":{ + // "kafka_broker":"sandbox.hortonworks.com:6667", + // "topic":"test_kafka" + // } + // } + ] +} \ No newline at end of file