Repository: eagle Updated Branches: refs/heads/master 1db33df5a -> e2fbb8613
[EAGLE-1038] Support alertDuplication customization for each policy https://issues.apache.org/jira/browse/EAGLE-1038 * support duplication check for each outputStream of a policy * compatible with the duplication check in old versions (check in a publisher) Author: Zhao, Qingwen <[email protected]> Closes #944 from qingwen220/minor. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/e2fbb861 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/e2fbb861 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/e2fbb861 Branch: refs/heads/master Commit: e2fbb861389ebfeb7d0c96743f7a79b185857e43 Parents: 1db33df Author: Zhao, Qingwen <[email protected]> Authored: Thu Jun 8 10:21:04 2017 +0800 Committer: Zhao, Qingwen <[email protected]> Committed: Thu Jun 8 10:21:04 2017 +0800 ---------------------------------------------------------------------- .../engine/coordinator/AlertDeduplication.java | 13 +- .../engine/coordinator/PolicyDefinition.java | 70 ++++---- .../alert/engine/model/AlertStreamEvent.java | 9 + .../coordinator/AlertDeduplicationTest.java | 47 +++++ .../engine/publisher/dedup/DedupCache.java | 1 - .../engine/publisher/dedup/DedupEntity.java | 2 - .../alert/engine/publisher/dedup/DedupKey.java | 71 ++++++++ .../publisher/dedup/DefaultDeduplicator.java | 178 +++++++++++++++++++ .../alert/engine/publisher/dedup/EventUniq.java | 82 +++++++++ .../publisher/impl/AbstractPublishPlugin.java | 5 +- .../publisher/impl/DefaultDeduplicator.java | 173 ------------------ .../alert/engine/publisher/impl/EventUniq.java | 83 --------- .../alert/engine/runner/AlertPublisherBolt.java | 42 +++-- .../engine/publisher/dedup/DedupCacheTest.java | 1 - .../engine/publisher/dedup/DedupKeyTest.java | 56 ++++++ .../dedup/DefaultDedupWithoutStateTest.java | 1 - .../dedup/DefaultDeduplicatorTest.java | 1 - .../publisher/dedup/TestDeduplicator.java | 1 - .../engine/router/TestAlertPublisherBolt.java | 83 ++++++++- 19 files changed, 596 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java index 78fef7a..d47d7d0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java @@ -24,9 +24,18 @@ import java.util.List; import java.util.Objects; public class AlertDeduplication { + private String outputStreamId; private String dedupIntervalMin; private List<String> dedupFields; + public String getOutputStreamId() { + return outputStreamId; + } + + public void setOutputStreamId(String outputStreamId) { + this.outputStreamId = outputStreamId; + } + public String getDedupIntervalMin() { return dedupIntervalMin; } @@ -46,6 +55,7 @@ public class AlertDeduplication { @Override public int hashCode() { return new HashCodeBuilder() + .append(outputStreamId) .append(dedupFields) .append(dedupIntervalMin) .build(); @@ -61,7 +71,8 @@ public class AlertDeduplication { } AlertDeduplication another = (AlertDeduplication) that; if (ListUtils.isEqualList(another.dedupFields, this.dedupFields) - && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) { + && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin) + && Objects.equals(another.outputStreamId, this.outputStreamId)) { return true; } return false; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index 5004513..698605e 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -43,7 +43,7 @@ public class PolicyDefinition implements Serializable { private Definition stateDefinition; private PolicyStatus policyStatus = PolicyStatus.ENABLED; private AlertDefinition alertDefinition; - private AlertDeduplication deduplication; + private List<AlertDeduplication> alertDeduplications = new ArrayList<>(); // one stream only have one partition in one policy, since we don't support stream alias private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>(); @@ -136,6 +136,38 @@ public class PolicyDefinition implements Serializable { this.policyStatus = policyStatus; } + public List<AlertDeduplication> getAlertDeduplications() { + return alertDeduplications; + } + + public void setAlertDeduplications(List<AlertDeduplication> alertDeduplications) { + this.alertDeduplications = alertDeduplications; + } + + public AlertDefinition getAlertDefinition() { + return alertDefinition; + } + + public void setAlertDefinition(AlertDefinition alertDefinition) { + this.alertDefinition = alertDefinition; + } + + public AlertSeverity getAlertSeverity() { + return alertDefinition == null ? null : alertDefinition.getSeverity(); + } + + public String getAlertCategory() { + return alertDefinition == null ? null : alertDefinition.getCategory(); + } + + public String getSiteId() { + return siteId; + } + + public void setSiteId(String siteId) { + this.siteId = siteId; + } + @Override public int hashCode() { return new HashCodeBuilder() @@ -148,7 +180,7 @@ public class PolicyDefinition implements Serializable { .append(policyStatus) .append(parallelismHint) .append(alertDefinition) - .append(deduplication) + .append(alertDeduplications) .build(); } @@ -175,44 +207,12 @@ public class PolicyDefinition implements Serializable { && another.policyStatus.equals(this.policyStatus) && another.parallelismHint == this.parallelismHint && Objects.equals(another.alertDefinition, alertDefinition) - && Objects.equals(another.deduplication, deduplication)) { + && CollectionUtils.isEqualCollection(another.alertDeduplications, alertDeduplications)) { return true; } return false; } - public AlertDefinition getAlertDefinition() { - return alertDefinition; - } - - public void setAlertDefinition(AlertDefinition alertDefinition) { - this.alertDefinition = alertDefinition; - } - - public AlertSeverity getAlertSeverity() { - return alertDefinition == null ? null : alertDefinition.getSeverity(); - } - - public String getAlertCategory() { - return alertDefinition == null ? null : alertDefinition.getCategory(); - } - - public String getSiteId() { - return siteId; - } - - public void setSiteId(String siteId) { - this.siteId = siteId; - } - - public AlertDeduplication getDeduplication() { - return deduplication; - } - - public void setDeduplication(AlertDeduplication deduplication) { - this.deduplication = deduplication; - } - @JsonIgnoreProperties(ignoreUnknown = true) public static class Definition implements Serializable { private static final long serialVersionUID = -622366527887848346L; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java index 00170df..3079f77 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java @@ -39,6 +39,7 @@ public class AlertStreamEvent extends StreamEvent { private long createdTime; private String category; private AlertSeverity severity = AlertSeverity.WARNING; + private boolean duplicationChecked = false; // ---------------------- // Lazy Alert Fields @@ -187,4 +188,12 @@ public class AlertStreamEvent extends StreamEvent { public void setSiteId(String siteId) { this.siteId = siteId; } + + public boolean isDuplicationChecked() { + return duplicationChecked; + } + + public void setDuplicationChecked(boolean duplicationChecked) { + this.duplicationChecked = duplicationChecked; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java new file mode 100644 index 0000000..59da244 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplicationTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; + +public class AlertDeduplicationTest { + + @Test + public void testEqual() { + AlertDeduplication deduplication1 = new AlertDeduplication(); + deduplication1.setDedupIntervalMin("1"); + deduplication1.setOutputStreamId("stream"); + + AlertDeduplication deduplication2 = new AlertDeduplication(); + deduplication2.setDedupIntervalMin("1"); + deduplication2.setOutputStreamId("stream"); + deduplication2.setDedupFields(new ArrayList<>()); + + Assert.assertFalse(deduplication1.equals(deduplication2)); + + AlertDeduplication deduplication3 = new AlertDeduplication(); + deduplication3.setDedupFields(new ArrayList<>()); + deduplication3.setOutputStreamId("stream"); + deduplication3.setDedupIntervalMin("1"); + + Assert.assertTrue(deduplication3.equals(deduplication2)); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java index abb83d6..96eeffd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCache.java @@ -26,7 +26,6 @@ import org.apache.commons.lang.time.DateUtils; import org.apache.commons.lang3.StringUtils; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java index 86bc9b3..e666c64 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupEntity.java @@ -20,8 +20,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; - public class DedupEntity { private String publishName; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java new file mode 100644 index 0000000..f6b03a4 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKey.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.publisher.dedup; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.Objects; + +public class DedupKey { + private String policyId; + private String outputStreamId; + + public DedupKey(String policyId, String outputStreamId) { + this.policyId = policyId; + this.outputStreamId = outputStreamId; + } + + public String getPolicyId() { + return policyId; + } + + public void setPolicyId(String policyId) { + this.policyId = policyId; + } + + public String getOutputStreamId() { + return outputStreamId; + } + + public void setOutputStreamId(String outputStreamId) { + this.outputStreamId = outputStreamId; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof DedupKey) { + DedupKey au = (DedupKey) obj; + return Objects.equals(au.getOutputStreamId(), this.outputStreamId) + && Objects.equals(au.getPolicyId(), this.policyId); + } + return false; + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(outputStreamId).append(policyId).build(); + } + + @Override + public String toString() { + return String.format("DedupKey[outputStreamId: %s, policyId: %s]", outputStreamId, policyId); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java new file mode 100644 index 0000000..2307d7a --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicator.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.alert.engine.publisher.dedup; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.AlertDeduplication; +import org.apache.eagle.alert.engine.coordinator.StreamColumn; +import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.model.AlertStreamEvent; +import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; +import org.joda.time.Period; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class DefaultDeduplicator implements AlertDeduplicator { + + private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); + + private long dedupIntervalSec; + private List<String> customDedupFields = new ArrayList<>(); + private String dedupStateField; + private String dedupStateCloseValue; + private AlertDeduplication alertDeduplication = null; + + private DedupCache dedupCache; + + private Cache<EventUniq, String> withoutStatesCache; + + public DefaultDeduplicator(AlertDeduplication alertDeduplication) { + this.alertDeduplication = alertDeduplication; + this.customDedupFields = alertDeduplication.getDedupFields(); + try { + this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60; + } catch (Exception e) { + LOG.error("de-duplication intervalSec {} parse error, use 30 min instead", alertDeduplication.getDedupIntervalMin(), e.getMessage()); + this.dedupIntervalSec = 1800; + } + this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( + this.dedupIntervalSec, TimeUnit.SECONDS).build(); + + LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields); + } + + public DefaultDeduplicator(String intervalMin, List<String> customDedupFields, + String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) { + setDedupIntervalMin(intervalMin); + if (customDedupFields != null) { + this.customDedupFields = customDedupFields; + } + if (StringUtils.isNotBlank(dedupStateField)) { + this.dedupStateField = dedupStateField; + } + if (StringUtils.isNotBlank(dedupStateCloseValue)) { + this.dedupStateCloseValue = dedupStateCloseValue; + } + this.dedupCache = dedupCache; + + withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( + this.dedupIntervalSec, TimeUnit.SECONDS).build(); + + LOG.info("initialize DefaultDeduplicator with dedupIntervalSec={}, customDedupFields={}", dedupIntervalSec, customDedupFields); + } + + /* + * @param key + * @return + */ + private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { + if (StringUtils.isBlank(stateFiledValue)) { + // without state field, we cannot determine whether it is duplicated + // without custom filed values, we cannot determine whether it is duplicated + synchronized (withoutStatesCache) { + if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key); + } + return null; + } else if (withoutStatesCache != null) { + withoutStatesCache.put(key, ""); + } + } + return Arrays.asList(event); + } + return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue); + } + + public List<AlertStreamEvent> dedup(AlertStreamEvent event) { + if (event == null) { + return null; + } + if (dedupIntervalSec <= 0) { + return Collections.singletonList(event); + } + + // check custom field, and get the field values + StreamDefinition streamDefinition = event.getSchema(); + HashMap<String, String> customFieldValues = new HashMap<>(); + String stateFiledValue = null; + for (int i = 0; i < event.getData().length; i++) { + if (i > streamDefinition.getColumns().size()) { + if (LOG.isWarnEnabled()) { + LOG.warn("output column does not found for event data, this indicate code error!"); + } + continue; + } + String colName = streamDefinition.getColumns().get(i).getName(); + Object colValue = event.getData()[i]; + + if (colName.equals(dedupStateField) && colValue != null) { + stateFiledValue = colValue.toString(); + } + + // make all of the field as unique key if no custom dedup field provided + if (colValue != null) { + if (customDedupFields == null || customDedupFields.size() <= 0) { + if (streamDefinition.getColumns().get(i).getType().equals(StreamColumn.Type.STRING)) { + customFieldValues.put(colName, colValue.toString()); + } + } else { + for (String field : customDedupFields) { + if (colName.equals(field)) { + customFieldValues.put(field, colValue.toString()); + break; + } + } + } + } + } + + List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(), + event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue); + if (outputEvents != null && outputEvents.size() > 0) { + return outputEvents; + } else if (LOG.isInfoEnabled()) { + LOG.info("Alert event is skipped because it's duplicated: {}", event.toString()); + } + return null; + } + + @Override + public void setDedupIntervalMin(String newDedupIntervalMin) { + if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) { + dedupIntervalSec = 0; + return; + } + try { + Period period = Period.parse(newDedupIntervalMin); + this.dedupIntervalSec = period.toStandardSeconds().getSeconds(); + } catch (Exception e) { + LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e); + this.dedupIntervalSec = 0; + } + } + + public AlertDeduplication getAlertDeduplication() { + return alertDeduplication; + } + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java new file mode 100644 index 0000000..1434bb7 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/dedup/EventUniq.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * + */ +package org.apache.eagle.alert.engine.publisher.dedup; + +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import com.google.common.base.Joiner; + +import java.util.HashMap; + +/** + * @since Mar 19, 2015. + */ +public class EventUniq { + public String streamId; + public String policyId; + public Long timestamp; // event's createTimestamp + public long createdTime; // created time, for cache removal; + public HashMap<String, String> customFieldValues; + public boolean removable = false; + + public EventUniq(String streamId, String policyId, long timestamp) { + this.streamId = streamId; + this.timestamp = timestamp; + this.policyId = policyId; + this.createdTime = System.currentTimeMillis(); + } + + public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) { + this.streamId = streamId; + this.timestamp = timestamp; + this.policyId = policyId; + this.createdTime = System.currentTimeMillis(); + this.customFieldValues = customFieldValues; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof EventUniq) { + EventUniq au = (EventUniq) obj; + boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId); + if (this.customFieldValues != null && au.customFieldValues != null) { + result = result & this.customFieldValues.equals(au.customFieldValues); + } + return result; + } + return false; + } + + @Override + public int hashCode() { + HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId); + + if (customFieldValues != null) { + builder.append(customFieldValues); + } + return builder.build(); + } + + @Override + public String toString() { + return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, removable: %s, customFieldValues: %s]", + streamId, policyId, timestamp, removable, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index c5c9e04..771f736 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -26,6 +26,7 @@ import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.dedup.DedupCache; +import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator; import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator; import org.slf4j.Logger; @@ -93,7 +94,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin { } serializer = (IEventSerializer) obj; } catch (Exception e) { - getLogger().error(String.format("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage()), e); + getLogger().error("initialized failed, use default StringEventSerializer, failure message : {}", e.getMessage(), e); serializer = new StringEventSerializer(conf); } } @@ -105,7 +106,7 @@ public abstract class AbstractPublishPlugin implements AlertPublishPlugin { @Override public List<AlertStreamEvent> dedup(AlertStreamEvent event) { - if (null != deduplicator) { + if (null != deduplicator && !event.isDuplicationChecked()) { return deduplicator.dedup(event); } else { return Collections.singletonList(event); http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java deleted file mode 100644 index 54d551e..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.commons.lang3.StringUtils; -import org.apache.eagle.alert.engine.coordinator.AlertDeduplication; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; -import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; -import org.apache.eagle.alert.engine.publisher.dedup.DedupCache; -import org.joda.time.Period; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class DefaultDeduplicator implements AlertDeduplicator { - - private static Logger LOG = LoggerFactory.getLogger(DefaultDeduplicator.class); - - private long dedupIntervalSec; - private List<String> customDedupFields = new ArrayList<>(); - private String dedupStateField; - private String dedupStateCloseValue; - - private DedupCache dedupCache; - - private Cache<EventUniq, String> withoutStatesCache; - - public DefaultDeduplicator() { - this.dedupIntervalSec = 0; - } - - public DefaultDeduplicator(String intervalMin) { - setDedupIntervalMin(intervalMin); - } - - public DefaultDeduplicator(long intervalMin) { - this.dedupIntervalSec = intervalMin; - } - - public DefaultDeduplicator(AlertDeduplication alertDeduplication) { - this.customDedupFields = alertDeduplication.getDedupFields(); - this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60; - this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( - this.dedupIntervalSec, TimeUnit.SECONDS).build(); - } - - public DefaultDeduplicator(String intervalMin, List<String> customDedupFields, - String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) { - setDedupIntervalMin(intervalMin); - if (customDedupFields != null) { - this.customDedupFields = customDedupFields; - } - if (StringUtils.isNotBlank(dedupStateField)) { - this.dedupStateField = dedupStateField; - } - if (StringUtils.isNotBlank(dedupStateCloseValue)) { - this.dedupStateCloseValue = dedupStateCloseValue; - } - this.dedupCache = dedupCache; - - withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( - this.dedupIntervalSec, TimeUnit.SECONDS).build(); - } - - /* - * @param key - * @return - */ - private List<AlertStreamEvent> checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { - if (StringUtils.isBlank(stateFiledValue)) { - // without state field, we cannot determine whether it is duplicated - // without custom filed values, we cannot determine whether it is duplicated - synchronized (withoutStatesCache) { - if (withoutStatesCache != null && withoutStatesCache.getIfPresent(key) != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Alert event {} with key {} is skipped since it is duplicated", event, key); - } - return null; - } else if (withoutStatesCache != null) { - withoutStatesCache.put(key, ""); - } - } - return Arrays.asList(event); - } - return dedupCache.dedup(event, key, dedupStateField, stateFiledValue, dedupStateCloseValue); - } - - public List<AlertStreamEvent> dedup(AlertStreamEvent event) { - if (event == null) { - return null; - } - // check custom field, and get the field values - StreamDefinition streamDefinition = event.getSchema(); - HashMap<String, String> customFieldValues = new HashMap<>(); - String stateFiledValue = null; - for (int i = 0; i < event.getData().length; i++) { - if (i > streamDefinition.getColumns().size()) { - if (LOG.isWarnEnabled()) { - LOG.warn("output column does not found for event data, this indicate code error!"); - } - continue; - } - String colName = streamDefinition.getColumns().get(i).getName(); - Object colValue = event.getData()[i]; - - if (colName.equals(dedupStateField) && colValue != null) { - stateFiledValue = colValue.toString(); - } - - // make all of the field as unique key if no custom dedup field provided - if (colValue != null) { - if (customDedupFields == null || customDedupFields.size() <= 0) { - customFieldValues.put(colName, colValue.toString()); - } else { - for (String field : customDedupFields) { - if (colName.equals(field)) { - customFieldValues.put(field, colValue.toString()); - break; - } - } - } - } - } - - List<AlertStreamEvent> outputEvents = checkDedup(event, new EventUniq(event.getStreamId(), - event.getPolicyId(), event.getCreatedTime(), customFieldValues), stateFiledValue); - if (outputEvents != null && outputEvents.size() > 0) { - return outputEvents; - } else if (LOG.isInfoEnabled()) { - LOG.info("Alert event is skipped because it's duplicated: {}", event.toString()); - } - return null; - } - - @Override - public void setDedupIntervalMin(String newDedupIntervalMin) { - if (newDedupIntervalMin == null || newDedupIntervalMin.isEmpty()) { - dedupIntervalSec = 0; - return; - } - try { - Period period = Period.parse(newDedupIntervalMin); - this.dedupIntervalSec = period.toStandardSeconds().getSeconds(); - } catch (Exception e) { - LOG.warn("Fail to pares deDupIntervalMin, will disable deduplication instead", e); - this.dedupIntervalSec = 0; - } - } - -} http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java deleted file mode 100644 index 511abcd..0000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/EventUniq.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * - */ -package org.apache.eagle.alert.engine.publisher.impl; - -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import com.google.common.base.Joiner; - -import java.util.HashMap; - -/** - * @since Mar 19, 2015. - */ -public class EventUniq { - public String streamId; - public String policyId; - public Long timestamp; // event's createTimestamp - public long createdTime; // created time, for cache removal; - public HashMap<String, String> customFieldValues; - public boolean removable = false; - - public EventUniq(String streamId, String policyId, long timestamp) { - this.streamId = streamId; - this.timestamp = timestamp; - this.policyId = policyId; - this.createdTime = System.currentTimeMillis(); - } - - public EventUniq(String streamId, String policyId, long timestamp, HashMap<String, String> customFieldValues) { - this.streamId = streamId; - this.timestamp = timestamp; - this.policyId = policyId; - this.createdTime = System.currentTimeMillis(); - this.customFieldValues = customFieldValues; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof EventUniq) { - EventUniq au = (EventUniq) obj; - boolean result = this.streamId.equalsIgnoreCase(au.streamId) & this.policyId.equalsIgnoreCase(au.policyId); - if (this.customFieldValues != null && au.customFieldValues != null) { - result = result & this.customFieldValues.equals(au.customFieldValues); - } - return result; - } - return false; - } - - @Override - public int hashCode() { - HashCodeBuilder builder = new HashCodeBuilder().append(streamId).append(policyId); - - if (customFieldValues != null) { - builder.append(customFieldValues); - } - return builder.build(); - } - - @Override - public String toString() { - return String.format("EventUniq[streamId: %s, policyId: %s, timestamp: %s, removable: %s, customFieldValues: %s]", - streamId, policyId, timestamp, removable, Joiner.on(",").withKeyValueSeparator(">").join(customFieldValues)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index d6829d6..39c577c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -28,22 +28,17 @@ import org.apache.eagle.alert.engine.StreamContextImpl; import org.apache.eagle.alert.engine.coordinator.*; import org.apache.eagle.alert.engine.model.AlertPublishEvent; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener; -import org.apache.eagle.alert.engine.publisher.AlertPublisher; -import org.apache.eagle.alert.engine.publisher.AlertStreamFilter; -import org.apache.eagle.alert.engine.publisher.PipeStreamFilter; +import org.apache.eagle.alert.engine.publisher.*; +import org.apache.eagle.alert.engine.publisher.dedup.DedupKey; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; -import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; +import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider; import org.apache.eagle.alert.utils.AlertConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -53,7 +48,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli private volatile Map<String, Publishment> cachedPublishments = new HashMap<>(); private volatile Map<String, PolicyDefinition> policyDefinitionMap; private volatile Map<String, StreamDefinition> streamDefinitionMap; - private volatile Map<String, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>(); + private volatile Map<DedupKey, DefaultDeduplicator> deduplicatorMap = new ConcurrentHashMap<>(); private AlertTemplateEngine alertTemplateEngine; private boolean logEventEnabled; @@ -90,13 +85,16 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli if (logEventEnabled) { LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event); } - if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) { - List<AlertStreamEvent> eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event); + DedupKey dedupKey = new DedupKey(event.getPolicyId(), event.getStreamId()); + if (deduplicatorMap != null && deduplicatorMap.containsKey(dedupKey)) { + List<AlertStreamEvent> eventList = deduplicatorMap.get(dedupKey).dedup(event); if (eventList == null || eventList.isEmpty()) { collector.ack(input); return; } + event.setDuplicationChecked(true); } + AlertStreamEvent filteredEvent = alertFilter.filter(event); if (filteredEvent != null) { alertPublisher.nextEvent(partition, filteredEvent); @@ -161,8 +159,15 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) { try { this.alertTemplateEngine.register(entry.getValue()); - if (entry.getValue().getDeduplication() != null) { - this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication())); + List<AlertDeduplication> alertDeduplications = entry.getValue().getAlertDeduplications(); + if (alertDeduplications != null && alertDeduplications.size() > 0) { + for (AlertDeduplication deduplication : alertDeduplications) { + DedupKey dedupKey = new DedupKey(entry.getKey(), deduplication.getOutputStreamId()); + if (!deduplicatorMap.containsKey(dedupKey) + || !deduplicatorMap.get(dedupKey).getAlertDeduplication().equals(deduplication)) { + deduplicatorMap.put(dedupKey, new DefaultDeduplicator(deduplication)); + } + } } } catch (Throwable throwable) { LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable); @@ -172,8 +177,10 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli for (String policyId : policyToRemove) { try { this.alertTemplateEngine.unregister(policyId); - if (deduplicatorMap != null && deduplicatorMap.containsKey(policyId)) { - deduplicatorMap.remove(policyId); + for (DedupKey dedupKey : deduplicatorMap.keySet()) { + if (dedupKey.getPolicyId().equals(policyId)) { + deduplicatorMap.remove(dedupKey); + } } } catch (Throwable throwable) { LOG.error("Failed to unregister policy {} from template engine", policyId, throwable); @@ -231,4 +238,5 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli return this.alertTemplateEngine.filter(event); } } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java index 5bf0410..95e679d 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupCacheTest.java @@ -27,7 +27,6 @@ import org.apache.eagle.alert.engine.coordinator.StreamColumn; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.coordinator.StreamPartition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java new file mode 100644 index 0000000..7fc886d --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DedupKeyTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.publisher.dedup; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class DedupKeyTest { + + @Test + public void test() { + Map<DedupKey, Integer> testMap = new HashMap<>(); + DedupKey key1 = new DedupKey("policy1", "stream1"); + update(testMap, key1); + update(testMap, key1); + + DedupKey key2 = new DedupKey("policy2", "stream2"); + update(testMap, key2); + + Assert.assertTrue(testMap.get(key1) == 1); + Assert.assertTrue(testMap.get(key2) == 0); + + DedupKey key3 = new DedupKey("policy1", "stream1"); + update(testMap, key3); + + Assert.assertTrue(testMap.get(key3) == 2); + } + + private void update(Map<DedupKey, Integer> map, DedupKey key) { + if (map.containsKey(key)) { + map.put(key, map.get(key) + 1); + } else { + map.put(key, 0); + } + } +} + + http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java index c48df9a..f839474 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java @@ -22,7 +22,6 @@ import com.typesafe.config.ConfigFactory; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java index 297b790..96da4c9 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java @@ -22,7 +22,6 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java index 247f332..51d054c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/TestDeduplicator.java @@ -18,7 +18,6 @@ package org.apache.eagle.alert.engine.publisher.dedup; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; -import org.apache.eagle.alert.engine.publisher.impl.EventUniq; import org.junit.Ignore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/eagle/blob/e2fbb861/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java index 46517fe..2cd2183 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/router/TestAlertPublisherBolt.java @@ -18,22 +18,23 @@ package org.apache.eagle.alert.engine.router; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.eagle.alert.coordination.model.PublishSpec; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; -import org.apache.eagle.alert.engine.coordinator.PublishPartition; -import org.apache.eagle.alert.engine.coordinator.Publishment; -import org.apache.eagle.alert.engine.coordinator.StreamColumn; -import org.apache.eagle.alert.engine.coordinator.StreamDefinition; +import org.apache.eagle.alert.engine.coordinator.*; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertPublishPlugin; import org.apache.eagle.alert.engine.publisher.AlertPublisher; +import org.apache.eagle.alert.engine.publisher.dedup.DedupKey; +import org.apache.eagle.alert.engine.publisher.dedup.DefaultDeduplicator; import org.apache.eagle.alert.engine.publisher.impl.AlertPublishPluginsFactory; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; +import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine; +import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider; import org.apache.eagle.alert.engine.runner.AlertPublisherBolt; import org.apache.eagle.alert.engine.runner.MapComparator; import org.apache.eagle.alert.engine.utils.MetadataSerDeser; @@ -300,4 +301,76 @@ public class TestAlertPublisherBolt { pubs.get(0).getName(), pubs.get(0).getPartitionColumns()), event3); } + + @Test + public void testOnAlertPolicyChange() throws IllegalAccessException, NoSuchFieldException { + AlertDeduplication deduplication = new AlertDeduplication(); + deduplication.setDedupIntervalMin("1"); + deduplication.setOutputStreamId("stream"); + + PolicyDefinition policy1 = new PolicyDefinition(); + policy1.setName("policy1"); + policy1.getAlertDeduplications().add(deduplication); + + Map<String, PolicyDefinition> pds1 = new HashMap<>(); + pds1.put("policy1", policy1); + + PolicyDefinition policy2 = new PolicyDefinition(); + policy2.setName("policy2"); + policy2.getAlertDeduplications().add(deduplication); + + Map<String, PolicyDefinition> pds2 = new HashMap<>(); + pds2.put("policy2", policy2); + + AlertPublisherBolt bolt = new AlertPublisherBolt("publisher", null, null); + + Field field = AlertPublisherBolt.class.getDeclaredField("alertTemplateEngine"); + field.setAccessible(true); + AlertTemplateEngine engine = AlertTemplateProvider.createAlertTemplateEngine(); + engine.init(null); + field.set(bolt, engine); + + DedupKey dedupKey1 = new DedupKey("policy1", "stream"); + DedupKey dedupKey2 = new DedupKey("policy2", "stream"); + bolt.onAlertPolicyChange(pds1, null); + Map<DedupKey, DefaultDeduplicator> deduplicatorMap = getAlertDeduplicator(bolt); + Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1)); + + // remove policy1 and add policy2 + bolt.onAlertPolicyChange(pds2, null); + deduplicatorMap = getAlertDeduplicator(bolt); + Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2)); + Assert.assertFalse(deduplicatorMap.containsKey(dedupKey1)); + + // add new policy policy1 in pds2 + pds2.put("policy1", policy1); + bolt.onAlertPolicyChange(pds2, null); + deduplicatorMap = getAlertDeduplicator(bolt); + Assert.assertTrue(deduplicatorMap.containsKey(dedupKey1)); + Assert.assertTrue(deduplicatorMap.containsKey(dedupKey2)); + Assert.assertTrue(deduplicatorMap.get(dedupKey1).getAlertDeduplication() + .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication())); + + // update policy1 alertDeduplication + AlertDeduplication deduplication1 = new AlertDeduplication(); + deduplication1.setOutputStreamId("stream"); + deduplication1.setDedupIntervalMin("2"); + policy1.getAlertDeduplications().clear(); + policy1.getAlertDeduplications().add(deduplication1); + pds2.put("policy1", policy1); + bolt.onAlertPolicyChange(pds2, null); + deduplicatorMap = getAlertDeduplicator(bolt); + Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy2", "stream"))); + Assert.assertTrue(deduplicatorMap.containsKey(new DedupKey("policy1", "stream"))); + Assert.assertFalse(deduplicatorMap.get(dedupKey1).getAlertDeduplication() + .equals(deduplicatorMap.get(dedupKey2).getAlertDeduplication())); + + } + + + private Map<DedupKey, DefaultDeduplicator> getAlertDeduplicator(AlertPublisherBolt bolt) throws NoSuchFieldException, IllegalAccessException { + Field field = AlertPublisherBolt.class.getDeclaredField("deduplicatorMap"); + field.setAccessible(true); + return (Map<DedupKey, DefaultDeduplicator>) field.get(bolt); + } }
