apucher closed pull request #3463: [TE] Implement Threshold based Time Window
Suppressor
URL: https://github.com/apache/incubator-pinot/pull/3463
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
index 3a51c950d5..61d118f2ca 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
@@ -85,8 +85,7 @@ public DetectionAlertFilter
loadAlertFilter(DetectionAlertConfigDTO alertConfig,
return detectionAlertSchemeSet;
}
- public Set<DetectionAlertSuppressor>
loadAlertSuppressors(DetectionAlertConfigDTO alertConfig,
- ThirdEyeAnomalyConfiguration thirdeyeConfig) throws Exception {
+ public Set<DetectionAlertSuppressor>
loadAlertSuppressors(DetectionAlertConfigDTO alertConfig) throws Exception {
Preconditions.checkNotNull(alertConfig);
Set<DetectionAlertSuppressor> detectionAlertSuppressors = new HashSet<>();
Map<String, Map<String, Object>> alertSuppressors =
alertConfig.getAlertSuppressors();
@@ -99,8 +98,8 @@ public DetectionAlertFilter
loadAlertFilter(DetectionAlertConfigDTO alertConfig,
Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor));
Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor).get("className"));
Constructor<?> constructor =
Class.forName(alertSuppressors.get(alertSuppressor).get("className").toString().trim())
- .getConstructor(DetectionAlertConfigDTO.class,
ThirdEyeAnomalyConfiguration.class);
- detectionAlertSuppressors.add((DetectionAlertSuppressor)
constructor.newInstance(alertConfig, thirdeyeConfig));
+ .getConstructor(DetectionAlertConfigDTO.class);
+ detectionAlertSuppressors.add((DetectionAlertSuppressor)
constructor.newInstance(alertConfig));
}
return detectionAlertSuppressors;
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 4bc9b3a0bf..669b442334 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -101,8 +101,7 @@ private void
updateAlertConfigWatermarks(DetectionAlertFilterResult result, Dete
DetectionAlertFilterResult result = alertFilter.run();
// Suppress alerts if any and get the filtered anomalies to be notified
- Set<DetectionAlertSuppressor> alertSuppressors =
- detAlertTaskFactory.loadAlertSuppressors(alertConfig,
taskContext.getThirdEyeAnomalyConfiguration());
+ Set<DetectionAlertSuppressor> alertSuppressors =
detAlertTaskFactory.loadAlertSuppressors(alertConfig);
for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) {
result = alertSuppressor.run(result);
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
index 72a67b040a..6ff87b229a 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
@@ -1,5 +1,6 @@
package com.linkedin.thirdeye.detection.alert.suppress;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
@@ -11,5 +12,11 @@
*/
public abstract class DetectionAlertSuppressor {
+ protected final DetectionAlertConfigDTO config;
+
+ public DetectionAlertSuppressor(DetectionAlertConfigDTO config) {
+ this.config = config;
+ }
+
public abstract DetectionAlertFilterResult run(DetectionAlertFilterResult
result) throws Exception;
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
new file mode 100644
index 0000000000..9fd5ded544
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
@@ -0,0 +1,127 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyFeedback;
+import com.linkedin.thirdeye.constant.AnomalyFeedbackType;
+import com.linkedin.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datasource.DAORegistry;
+import com.linkedin.thirdeye.detection.ConfigUtils;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Suppress alerts from anomalies generated during a specific time period.
+ *
+ * This class enables 2 ways of suppressing alerts
+ * 1. Suppress all the alerts generated during the time window. No alerts will
be sent.
+ * ({@link #WINDOW_START_TIME_KEY} and {@link #WINDOW_END_TIME_KEY})
+ * 2. Suppress alerts in the time window based on some thresholds.
+ * ({@link #EXPECTED_CHANGE_KEY} and {@link #ACCEPTABLE_DEVIATION_KEY})
+ */
+public class DetectionAlertTimeWindowSuppressor extends
DetectionAlertSuppressor {
+ private static final Logger LOG =
LoggerFactory.getLogger(DetectionAlertTimeWindowSuppressor.class);
+
+ static final String TIME_WINDOW_SUPPRESSOR_KEY = "timeWindowSuppressor";
+ static final String TIME_WINDOWS_KEY = "timeWindows";
+
+ static final String WINDOW_START_TIME_KEY = "windowStartTime";
+ static final String WINDOW_END_TIME_KEY = "windowEndTime";
+ static final String IS_THRESHOLD_KEY = "isThresholdApplied";
+
+ // The expected rise or fall of a metric during the holiday or suppression
period (ex: -0.5 for 50% drop)
+ static final String EXPECTED_CHANGE_KEY = "expectedChange";
+
+ // The acceptable deviation from the dropped/risen value during the
suppression period (ex: 0.1 for +/- 10%)
+ static final String ACCEPTABLE_DEVIATION_KEY = "acceptableDeviation";
+
+ public DetectionAlertTimeWindowSuppressor(DetectionAlertConfigDTO config) {
+ super(config);
+ }
+
+ private boolean isAnomalySuppressedByThreshold(double anomalyWeight,
Map<String, Object> suppressWindowProps) {
+ double expectedDropOrSpike = (Double)
suppressWindowProps.get(EXPECTED_CHANGE_KEY);
+ double acceptableDeviation = (Double)
suppressWindowProps.get(ACCEPTABLE_DEVIATION_KEY);
+ if (anomalyWeight <= (expectedDropOrSpike + acceptableDeviation)
+ && anomalyWeight >= (expectedDropOrSpike - acceptableDeviation)) {
+ LOG.info("Anomaly id {} falls within the specified thresholds
(anomalyWeight = {}, expectedDropOrSpike = {},"
+ + " acceptableDeviation = {})", anomalyWeight,
expectedDropOrSpike, acceptableDeviation);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Check if the anomaly needs to be suppressed. An anomaly is suppressed if
the startTime
+ * of the anomaly falls in the suppression time window and is within the
user's expected range.
+ */
+ private boolean isAnomalySuppressed(MergedAnomalyResultDTO anomaly,
Map<String, Object> suppressWindowProps) {
+ boolean shouldSuppress = false;
+ try {
+ long windowStartTime = (Long)
suppressWindowProps.get(WINDOW_START_TIME_KEY);
+ long windowEndTime = (Long) suppressWindowProps.get(WINDOW_END_TIME_KEY);
+ if (anomaly.getStartTime() >= windowStartTime && anomaly.getStartTime()
< windowEndTime) {
+ LOG.info("Anomaly id {} falls in the suppression time window ({},
{})", anomaly.getId(), windowStartTime, windowEndTime);
+ if (suppressWindowProps.get(IS_THRESHOLD_KEY) != null && (Boolean)
suppressWindowProps.get(IS_THRESHOLD_KEY)) {
+ shouldSuppress = isAnomalySuppressedByThreshold(anomaly.getWeight(),
suppressWindowProps);
+ } else {
+ shouldSuppress = true;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while suppressing anomaly id {} with suppress window
properties {}", anomaly.getId(),
+ suppressWindowProps, e);
+ }
+
+ return shouldSuppress;
+ }
+
+ private void filterOutSuppressedAnomalies(final Set<MergedAnomalyResultDTO>
anomalies) {
+ Iterator<MergedAnomalyResultDTO> anomaliesIt = anomalies.iterator();
+ MergedAnomalyResultManager anomalyMergedResultDAO =
DAORegistry.getInstance().getMergedAnomalyResultDAO();
+
+ List<Map<String, Object>> suppressWindowPropsList
+ =
ConfigUtils.getList(config.getAlertSuppressors().get(TIME_WINDOW_SUPPRESSOR_KEY).get(TIME_WINDOWS_KEY));
+
+ while (anomaliesIt.hasNext()) {
+ MergedAnomalyResultDTO anomaly = anomaliesIt.next();
+ for (Map<String, Object> suppressWindowProps : suppressWindowPropsList) {
+ if (isAnomalySuppressed(anomaly, suppressWindowProps)) {
+ LOG.info("Suppressing anomaly id {} with suppress properties {}.
Anomaly Details = {}", anomaly.getId(), suppressWindowProps, anomaly);
+ anomaliesIt.remove();
+ AnomalyFeedback feedback = anomaly.getFeedback();
+ if (feedback == null) {
+ feedback = new AnomalyFeedbackDTO();
+ }
+
+ // Suppressing is a way by which users admit that anomalies during
this period
+ // are expected. We also do not want the algorithm to readjust the
baseline.
+ feedback.setFeedbackType(AnomalyFeedbackType.ANOMALY);
+ feedback.setComment("Suppressed anomaly. Auto-labeling as true
anomaly.");
+
+ anomaly.setFeedback(feedback);
+ anomalyMergedResultDAO.updateAnomalyFeedback(anomaly);
+ }
+ }
+ }
+ }
+
+ @Override
+ public DetectionAlertFilterResult run(DetectionAlertFilterResult results)
throws Exception {
+ Preconditions.checkNotNull(results);
+ for (Set<MergedAnomalyResultDTO> anomalies : results.getResult().values())
{
+ filterOutSuppressedAnomalies(anomalies);
+ }
+
+ return results;
+ }
+}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
new file mode 100644
index 0000000000..2dddd4a869
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
@@ -0,0 +1,157 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.linkedin.thirdeye.datalayer.bao.DAOTestBase;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static
com.linkedin.thirdeye.detection.alert.suppress.DetectionAlertTimeWindowSuppressor.*;
+
+
+public class DetectionTimeWindowSuppressorTest {
+
+ private DAOTestBase testDAOProvider;
+ private Set<MergedAnomalyResultDTO> anomalies;
+ private DetectionAlertConfigDTO config;
+
+ private Map<String, Object> createSuppressWindow(long startTime, long
endTime, boolean isThreshold, double expectedChange,
+ double acceptableDeviation) {
+ Map<String, Object> suppressWindowProps = new HashMap<>();
+ suppressWindowProps.put(WINDOW_START_TIME_KEY, startTime);
+ suppressWindowProps.put(WINDOW_END_TIME_KEY, endTime);
+ suppressWindowProps.put(IS_THRESHOLD_KEY, isThreshold);
+ suppressWindowProps.put(EXPECTED_CHANGE_KEY, expectedChange);
+ suppressWindowProps.put(ACCEPTABLE_DEVIATION_KEY, acceptableDeviation);
+ return suppressWindowProps;
+ }
+
+ private MergedAnomalyResultDTO createAnomaly(long id, long startTime, long
endTime, double weight) {
+ MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+ anomaly.setId(id);
+ anomaly.setStartTime(startTime);
+ anomaly.setEndTime(endTime);
+ anomaly.setWeight(weight);
+ return anomaly;
+ }
+
+ private void initDetectionAlertConfig() {
+ config = new DetectionAlertConfigDTO();
+
+ List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+ suppressWindowList.add(createSuppressWindow(1000, 3000, true, 0.5, 0.1));
+ suppressWindowList.add(createSuppressWindow(4500, 6000, true, 0.6, 0.2));
+
+ Map<String, Object> params = new HashMap<>();
+ params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+ Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+ alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+ config.setAlertSuppressors(alertSuppressors);
+ }
+
+ private void initAnomalies() {
+ anomalies = new HashSet<>();
+
+ anomalies.add(createAnomaly(1l, 500, 900, 0.5));
+ anomalies.add(createAnomaly(2l, 700, 1000, 0.8));
+ anomalies.add(createAnomaly(3l, 500, 1500, 0.2));
+ anomalies.add(createAnomaly(4l, 1000, 1500, 0.4));
+ anomalies.add(createAnomaly(5l, 1500, 2500, 0.6));
+ anomalies.add(createAnomaly(6l, 2500, 3000, 0.7));
+ anomalies.add(createAnomaly(7l, 2000, 3500, 0.5));
+ anomalies.add(createAnomaly(8l, 3000, 3500, 0.6));
+ anomalies.add(createAnomaly(9l, 3500, 4000, 0.1));
+ anomalies.add(createAnomaly(10l, 5000, 5500, 0.5));
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ this.testDAOProvider = DAOTestBase.getInstance();
+ }
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+ initAnomalies();
+ initDetectionAlertConfig();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() {
+ testDAOProvider.cleanup();
+ }
+
+ /**
+ * Anomaly distribution along with suppression windows.
+ *
+ * Anomalies 4, 5, 7, and 10 should be suppressed (not notified).
+ * Anomaly 6 is not suppressed because it falls outside the suppression
region.
+ *
+ * *-----3----* *------7-------*
+ * |
+ * | *-2-* *----5----* *--8-*
+ * | | |
+ * *-1-* *--4-* *--6-* *--9-* *---10---*
+ * | | |
+ * _____|_____|___________________|______________|________________|
+ * | | | | |
+ * 500 | | | |
+ * 1000----<window1>----3000 4500--<window2>--6000
+ */
+ @Test
+ public void testTimeWindowSuppressorWithThreshold() throws Exception {
+
+ DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+ result.addMapping(new
DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+ DetectionAlertTimeWindowSuppressor suppressor = new
DetectionAlertTimeWindowSuppressor(config);
+ DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+ Set<Long> filteredAnomalyIds = new HashSet<>(Arrays.asList(1l, 2l, 3l, 6l,
8l, 9l));
+
+ Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 6);
+ for (MergedAnomalyResultDTO anomaly :
resultsAfterSuppress.getAllAnomalies()) {
+ Assert.assertTrue(filteredAnomalyIds.contains(anomaly.getId()));
+ }
+ }
+
+ /**
+ * Overlapping time window suppressor without thresholds
+ */
+ @Test
+ public void testTimeWindowSuppressor() throws Exception {
+ List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+ suppressWindowList.add(createSuppressWindow(500, 6000, false, 0, 0));
+
+ Map<String, Object> params = new HashMap<>();
+ params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+ Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+ alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+ config.setAlertSuppressors(alertSuppressors);
+
+ DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+ result.addMapping(new
DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+ DetectionAlertTimeWindowSuppressor suppressor = new
DetectionAlertTimeWindowSuppressor(config);
+ DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+ Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 0);
+ }
+
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]