This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b4fd51  [TE] Implement Threshold based Time Window Suppressor (#3463)
0b4fd51 is described below

commit 0b4fd51e7e442d07d3caf95117e1d9cc4e0e33ab
Author: Akshay Rai <[email protected]>
AuthorDate: Tue Nov 13 15:12:00 2018 -0800

    [TE] Implement Threshold based Time Window Suppressor (#3463)
---
 .../detection/alert/DetectionAlertTaskFactory.java |   7 +-
 .../detection/alert/DetectionAlertTaskRunner.java  |   3 +-
 .../alert/suppress/DetectionAlertSuppressor.java   |   7 +
 .../DetectionAlertTimeWindowSuppressor.java        | 127 +++++++++++++++++
 .../DetectionTimeWindowSuppressorTest.java         | 157 +++++++++++++++++++++
 5 files changed, 295 insertions(+), 6 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
index 3a51c95..61d118f 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
@@ -85,8 +85,7 @@ public class DetectionAlertTaskFactory {
     return detectionAlertSchemeSet;
   }
 
-  public Set<DetectionAlertSuppressor> 
loadAlertSuppressors(DetectionAlertConfigDTO alertConfig,
-      ThirdEyeAnomalyConfiguration thirdeyeConfig) throws Exception {
+  public Set<DetectionAlertSuppressor> 
loadAlertSuppressors(DetectionAlertConfigDTO alertConfig) throws Exception {
     Preconditions.checkNotNull(alertConfig);
     Set<DetectionAlertSuppressor> detectionAlertSuppressors = new HashSet<>();
     Map<String, Map<String, Object>> alertSuppressors = 
alertConfig.getAlertSuppressors();
@@ -99,8 +98,8 @@ public class DetectionAlertTaskFactory {
       Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor));
       
Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor).get("className"));
       Constructor<?> constructor = 
Class.forName(alertSuppressors.get(alertSuppressor).get("className").toString().trim())
-          .getConstructor(DetectionAlertConfigDTO.class, 
ThirdEyeAnomalyConfiguration.class);
-      detectionAlertSuppressors.add((DetectionAlertSuppressor) 
constructor.newInstance(alertConfig, thirdeyeConfig));
+          .getConstructor(DetectionAlertConfigDTO.class);
+      detectionAlertSuppressors.add((DetectionAlertSuppressor) 
constructor.newInstance(alertConfig));
     }
 
     return detectionAlertSuppressors;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 4bc9b3a..669b442 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -101,8 +101,7 @@ public class DetectionAlertTaskRunner implements TaskRunner 
{
       DetectionAlertFilterResult result = alertFilter.run();
 
       // Suppress alerts if any and get the filtered anomalies to be notified
-      Set<DetectionAlertSuppressor> alertSuppressors =
-          detAlertTaskFactory.loadAlertSuppressors(alertConfig, 
taskContext.getThirdEyeAnomalyConfiguration());
+      Set<DetectionAlertSuppressor> alertSuppressors = 
detAlertTaskFactory.loadAlertSuppressors(alertConfig);
       for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) {
         result = alertSuppressor.run(result);
       }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
index 72a67b0..6ff87b2 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
@@ -1,5 +1,6 @@
 package com.linkedin.thirdeye.detection.alert.suppress;
 
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
 
 
@@ -11,5 +12,11 @@ import 
com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
  */
 public abstract class DetectionAlertSuppressor {
 
+  protected final DetectionAlertConfigDTO config;
+
+  public DetectionAlertSuppressor(DetectionAlertConfigDTO config) {
+    this.config = config;
+  }
+
   public abstract DetectionAlertFilterResult run(DetectionAlertFilterResult 
result) throws Exception;
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
new file mode 100644
index 0000000..9fd5ded
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
@@ -0,0 +1,127 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyFeedback;
+import com.linkedin.thirdeye.constant.AnomalyFeedbackType;
+import com.linkedin.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datasource.DAORegistry;
+import com.linkedin.thirdeye.detection.ConfigUtils;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Suppress alerts from anomalies generated during a specific time period.
+ *
+ * This class enables 2 ways of suppressing alerts
+ * 1. Suppress all the alerts generated during the time window. No alerts will 
be sent.
+ *    ({@link #WINDOW_START_TIME_KEY} and {@link #WINDOW_END_TIME_KEY})
+ * 2. Suppress alerts in the time window based on some thresholds.
+ *    ({@link #EXPECTED_CHANGE_KEY} and {@link #ACCEPTABLE_DEVIATION_KEY})
+ */
+public class DetectionAlertTimeWindowSuppressor extends 
DetectionAlertSuppressor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DetectionAlertTimeWindowSuppressor.class);
+
+  static final String TIME_WINDOW_SUPPRESSOR_KEY = "timeWindowSuppressor";
+  static final String TIME_WINDOWS_KEY = "timeWindows";
+
+  static final String WINDOW_START_TIME_KEY = "windowStartTime";
+  static final String WINDOW_END_TIME_KEY = "windowEndTime";
+  static final String IS_THRESHOLD_KEY = "isThresholdApplied";
+
+  // The expected rise or fall of a metric during the holiday or suppression 
period (ex: -0.5 for 50% drop)
+  static final String EXPECTED_CHANGE_KEY = "expectedChange";
+
+  // The acceptable deviation from the dropped/risen value during the 
suppression period (ex: 0.1 for +/- 10%)
+  static final String ACCEPTABLE_DEVIATION_KEY = "acceptableDeviation";
+
+  public DetectionAlertTimeWindowSuppressor(DetectionAlertConfigDTO config) {
+    super(config);
+  }
+
+  private boolean isAnomalySuppressedByThreshold(double anomalyWeight, 
Map<String, Object> suppressWindowProps) {
+    double expectedDropOrSpike = (Double) 
suppressWindowProps.get(EXPECTED_CHANGE_KEY);
+    double acceptableDeviation = (Double) 
suppressWindowProps.get(ACCEPTABLE_DEVIATION_KEY);
+    if (anomalyWeight <= (expectedDropOrSpike + acceptableDeviation)
+        && anomalyWeight >= (expectedDropOrSpike - acceptableDeviation)) {
+      LOG.info("Anomaly id {} falls within the specified thresholds 
(anomalyWeight = {}, expectedDropOrSpike = {},"
+              + " acceptableDeviation = {})", anomalyWeight, 
expectedDropOrSpike, acceptableDeviation);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Check if the anomaly needs to be suppressed. An anomaly is suppressed if 
the startTime
+   * of the anomaly falls in the suppression time window and is within the 
user's expected range.
+   */
+  private boolean isAnomalySuppressed(MergedAnomalyResultDTO anomaly, 
Map<String, Object> suppressWindowProps) {
+    boolean shouldSuppress = false;
+    try {
+      long windowStartTime = (Long) 
suppressWindowProps.get(WINDOW_START_TIME_KEY);
+      long windowEndTime = (Long) suppressWindowProps.get(WINDOW_END_TIME_KEY);
+      if (anomaly.getStartTime() >= windowStartTime && anomaly.getStartTime() 
< windowEndTime) {
+        LOG.info("Anomaly id {} falls in the suppression time window ({}, 
{})", anomaly.getId(), windowStartTime, windowEndTime);
+        if (suppressWindowProps.get(IS_THRESHOLD_KEY) != null && (Boolean) 
suppressWindowProps.get(IS_THRESHOLD_KEY)) {
+          shouldSuppress = isAnomalySuppressedByThreshold(anomaly.getWeight(), 
suppressWindowProps);
+        } else {
+          shouldSuppress = true;
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while suppressing anomaly id {} with suppress window 
properties {}", anomaly.getId(),
+          suppressWindowProps, e);
+    }
+
+    return shouldSuppress;
+  }
+
+  private void filterOutSuppressedAnomalies(final Set<MergedAnomalyResultDTO> 
anomalies) {
+    Iterator<MergedAnomalyResultDTO> anomaliesIt = anomalies.iterator();
+    MergedAnomalyResultManager anomalyMergedResultDAO = 
DAORegistry.getInstance().getMergedAnomalyResultDAO();
+
+    List<Map<String, Object>> suppressWindowPropsList
+        = 
ConfigUtils.getList(config.getAlertSuppressors().get(TIME_WINDOW_SUPPRESSOR_KEY).get(TIME_WINDOWS_KEY));
+
+    while (anomaliesIt.hasNext()) {
+      MergedAnomalyResultDTO anomaly = anomaliesIt.next();
+      for (Map<String, Object> suppressWindowProps : suppressWindowPropsList) {
+        if (isAnomalySuppressed(anomaly, suppressWindowProps)) {
+          LOG.info("Suppressing anomaly id {} with suppress properties {}. 
Anomaly Details = {}", anomaly.getId(), suppressWindowProps, anomaly);
+          anomaliesIt.remove();
+          AnomalyFeedback feedback = anomaly.getFeedback();
+          if (feedback == null) {
+            feedback = new AnomalyFeedbackDTO();
+          }
+
+          // Suppressing is a way by which users admit that anomalies during 
this period
+          // are expected. We also do not want the algorithm to readjust the 
baseline.
+          feedback.setFeedbackType(AnomalyFeedbackType.ANOMALY);
+          feedback.setComment("Suppressed anomaly. Auto-labeling as true 
anomaly.");
+
+          anomaly.setFeedback(feedback);
+          anomalyMergedResultDAO.updateAnomalyFeedback(anomaly);
+        }
+      }
+    }
+  }
+
+  @Override
+  public DetectionAlertFilterResult run(DetectionAlertFilterResult results) 
throws Exception {
+    Preconditions.checkNotNull(results);
+    for (Set<MergedAnomalyResultDTO> anomalies : results.getResult().values()) 
{
+      filterOutSuppressedAnomalies(anomalies);
+    }
+
+    return results;
+  }
+}
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
new file mode 100644
index 0000000..2dddd4a
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
@@ -0,0 +1,157 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.linkedin.thirdeye.datalayer.bao.DAOTestBase;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static 
com.linkedin.thirdeye.detection.alert.suppress.DetectionAlertTimeWindowSuppressor.*;
+
+
+public class DetectionTimeWindowSuppressorTest {
+
+  private DAOTestBase testDAOProvider;
+  private Set<MergedAnomalyResultDTO> anomalies;
+  private DetectionAlertConfigDTO config;
+
+  private Map<String, Object> createSuppressWindow(long startTime, long 
endTime, boolean isThreshold, double expectedChange,
+      double acceptableDeviation) {
+    Map<String, Object> suppressWindowProps = new HashMap<>();
+    suppressWindowProps.put(WINDOW_START_TIME_KEY, startTime);
+    suppressWindowProps.put(WINDOW_END_TIME_KEY, endTime);
+    suppressWindowProps.put(IS_THRESHOLD_KEY, isThreshold);
+    suppressWindowProps.put(EXPECTED_CHANGE_KEY, expectedChange);
+    suppressWindowProps.put(ACCEPTABLE_DEVIATION_KEY, acceptableDeviation);
+    return suppressWindowProps;
+  }
+
+  private MergedAnomalyResultDTO createAnomaly(long id, long startTime, long 
endTime, double weight) {
+    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+    anomaly.setId(id);
+    anomaly.setStartTime(startTime);
+    anomaly.setEndTime(endTime);
+    anomaly.setWeight(weight);
+    return anomaly;
+  }
+
+  private void initDetectionAlertConfig() {
+    config = new DetectionAlertConfigDTO();
+
+    List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+    suppressWindowList.add(createSuppressWindow(1000, 3000, true, 0.5, 0.1));
+    suppressWindowList.add(createSuppressWindow(4500, 6000, true, 0.6, 0.2));
+
+    Map<String, Object> params = new HashMap<>();
+    params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+    Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+    alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+    config.setAlertSuppressors(alertSuppressors);
+  }
+
+  private void initAnomalies() {
+    anomalies = new HashSet<>();
+
+    anomalies.add(createAnomaly(1l, 500, 900, 0.5));
+    anomalies.add(createAnomaly(2l, 700, 1000, 0.8));
+    anomalies.add(createAnomaly(3l, 500, 1500, 0.2));
+    anomalies.add(createAnomaly(4l, 1000, 1500, 0.4));
+    anomalies.add(createAnomaly(5l, 1500, 2500, 0.6));
+    anomalies.add(createAnomaly(6l, 2500, 3000, 0.7));
+    anomalies.add(createAnomaly(7l, 2000, 3500, 0.5));
+    anomalies.add(createAnomaly(8l, 3000, 3500, 0.6));
+    anomalies.add(createAnomaly(9l, 3500, 4000, 0.1));
+    anomalies.add(createAnomaly(10l, 5000, 5500, 0.5));
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    this.testDAOProvider = DAOTestBase.getInstance();
+  }
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    initAnomalies();
+    initDetectionAlertConfig();
+  }
+
+  @AfterClass(alwaysRun = true)
+  void afterClass() {
+    testDAOProvider.cleanup();
+  }
+
+  /**
+   * Anomaly distribution along with suppression windows.
+   *
+   * Anomalies 4, 5, 7, and 10 should be suppressed (not notified).
+   * Anomaly 6 is not suppressed because it falls outside the suppression 
region.
+   *
+   *      *-----3----*    *------7-------*
+   *      |
+   *      | *-2-*    *----5----*    *--8-*
+   *      |     |                   |
+   *      *-1-* *--4-*         *--6-*    *--9-*        *---10---*
+   *      |     |                   |
+   * _____|_____|___________________|______________|________________|
+   *      |     |                   |              |                |
+   *    500     |                   |              |                |
+   *          1000----<window1>----3000          4500--<window2>--6000
+   */
+  @Test
+  public void testTimeWindowSuppressorWithThreshold() throws Exception {
+
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+    result.addMapping(new 
DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+    DetectionAlertTimeWindowSuppressor suppressor = new 
DetectionAlertTimeWindowSuppressor(config);
+    DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+    Set<Long> filteredAnomalyIds = new HashSet<>(Arrays.asList(1l, 2l, 3l, 6l, 
8l, 9l));
+
+    Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 6);
+    for (MergedAnomalyResultDTO anomaly : 
resultsAfterSuppress.getAllAnomalies()) {
+      Assert.assertTrue(filteredAnomalyIds.contains(anomaly.getId()));
+    }
+  }
+
+  /**
+   * Overlapping time window suppressor without thresholds
+   */
+  @Test
+  public void testTimeWindowSuppressor() throws Exception {
+    List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+    suppressWindowList.add(createSuppressWindow(500, 6000, false, 0, 0));
+
+    Map<String, Object> params = new HashMap<>();
+    params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+    Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+    alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+    config.setAlertSuppressors(alertSuppressors);
+
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+    result.addMapping(new 
DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+    DetectionAlertTimeWindowSuppressor suppressor = new 
DetectionAlertTimeWindowSuppressor(config);
+    DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+    Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 0);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to