akshayrai closed pull request #3495: [TE] New detection alerter to support 
alerting of legacy anomalies
URL: https://github.com/apache/incubator-pinot/pull/3495
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
index 5d4c825ba2..07984f265f 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
@@ -36,6 +36,7 @@
   String from;
   String cronExpression;
   String application;
+  boolean onlyFetchLegacyAnomalies;
 
   Map<String, Map<String, Object>> alertSchemes;
   Map<String, Map<String, Object>> alertSuppressors;
@@ -48,6 +49,14 @@
 
   Map<String, String> refLinks;
 
+  public boolean isOnlyFetchLegacyAnomalies() {
+    return onlyFetchLegacyAnomalies;
+  }
+
+  public void setOnlyFetchLegacyAnomalies(boolean onlyFetchLegacyAnomalies) {
+    this.onlyFetchLegacyAnomalies = onlyFetchLegacyAnomalies;
+  }
+
   public boolean isActive() {
     return active;
   }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
index 51e1a91d0d..c2e70e5ba0 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
@@ -65,6 +65,18 @@
    */
   Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, 
List<String> dimensions);
 
+  /**
+   * Returns a multimap of anomalies (keyed by slice) generated by the legacy 
detection pipeline.
+   *
+   * @see MergedAnomalyResultDTO
+   * @see AnomalySlice
+   *
+   * @param slices anomaly slice
+   * @param configId configId
+   * @return multimap of anomalies (keyed by slice)
+   */
+  Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchLegacyAnomalies(Collection<AnomalySlice> slices, long configId);
+
   /**
    * Returns a multimap of anomalies (keyed by slice) for a given set of 
slices.
    *
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index 4ea926b3b0..a13138d486 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -32,6 +32,7 @@
 import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
+import com.linkedin.thirdeye.detection.alert.StatefulDetectionAlertFilter;
 import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.util.ArrayList;
@@ -45,9 +46,12 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class DefaultDataProvider implements DataProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDataProvider.class);
   private static final long TIMEOUT = 60000;
 
   private final ExecutorService executor = Executors.newCachedThreadPool();
@@ -122,39 +126,59 @@ public DataFrame call() throws Exception {
     }
   }
 
-  @Override
-  public Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+  private Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchAnomalies(Collection<AnomalySlice> slices,
+      long configId, boolean isLegacy) {
+    String functionIdKey = "detectionConfigId";
+    if (isLegacy) {
+      functionIdKey = "functionId";
+    }
+
     Multimap<AnomalySlice, MergedAnomalyResultDTO> output = 
ArrayListMultimap.create();
     for (AnomalySlice slice : slices) {
       List<Predicate> predicates = new ArrayList<>();
-      if (slice.getEnd() >= 0)
+      if (slice.getEnd() >= 0) {
         predicates.add(Predicate.LT("startTime", slice.getEnd()));
-      if (slice.getStart() >= 0)
+      }
+      if (slice.getStart() >= 0) {
         predicates.add(Predicate.GT("endTime", slice.getStart()));
-      if (configId >= 0)
-        predicates.add(Predicate.EQ("detectionConfigId", configId));
+      }
+      if (configId >= 0) {
+        predicates.add(Predicate.EQ(functionIdKey, configId));
+      }
 
-      if (predicates.isEmpty())
-        throw new IllegalArgumentException("Must provide at least one of 
start, end, or detectionConfigId");
+      if (predicates.isEmpty()) throw new IllegalArgumentException("Must 
provide at least one of start, end, or " + functionIdKey);
 
       List<MergedAnomalyResultDTO> anomalies = 
this.anomalyDAO.findByPredicate(AND(predicates));
-      Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
-      while (itAnomaly.hasNext()) {
-        MergedAnomalyResultDTO anomaly = itAnomaly.next();
-        if (configId >= 0 && (anomaly.getDetectionConfigId() == null || 
anomaly.getDetectionConfigId() != configId)){
-          itAnomaly.remove();
-        }
-
-        if (!slice.match(anomaly)) {
-          itAnomaly.remove();
-        }
+      anomalies.removeIf(anomaly -> !slice.match(anomaly));
+
+      if (isLegacy) {
+        anomalies.removeIf(anomaly ->
+            (configId >= 0) && (anomaly.getFunctionId() == null || 
anomaly.getFunctionId() != configId)
+        );
+      } else {
+        anomalies.removeIf(anomaly ->
+            (configId >= 0) && (anomaly.getDetectionConfigId() == null || 
anomaly.getDetectionConfigId() != configId)
+        );
       }
 
+      LOG.info("Fetched {} legacy anomalies between (startTime = {}, endTime = 
{}) with confid Id = {}", anomalies.size(),
+          slice.getStart(), slice.getEnd(), configId);
       output.putAll(slice, anomalies);
     }
+
     return output;
   }
 
+  @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchLegacyAnomalies(Collection<AnomalySlice> slices, long configId) {
+    return fetchAnomalies(slices, configId, true);
+  }
+
+  @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+    return fetchAnomalies(slices, configId, false);
+  }
+
   @Override
   public Multimap<EventSlice, EventDTO> fetchEvents(Collection<EventSlice> 
slices) {
     Multimap<EventSlice, EventDTO> output = ArrayListMultimap.create();
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
index 3887ca64dc..5d655e2e0b 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
@@ -102,6 +102,11 @@ private static long 
getLastTimeStamp(Collection<MergedAnomalyResultDTO> anomalie
       @Nullable
       @Override
       public Long apply(@Nullable MergedAnomalyResultDTO 
mergedAnomalyResultDTO) {
+        // Return functionId to support alerting of legacy anomalies
+        if (mergedAnomalyResultDTO.getDetectionConfigId() == null) {
+          return mergedAnomalyResultDTO.getFunctionId();
+        }
+
         return mergedAnomalyResultDTO.getDetectionConfigId();
       }
     });
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 669b442334..50285cf6c3 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -85,6 +85,8 @@ private void 
updateAlertConfigWatermarks(DetectionAlertFilterResult result, Dete
         AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(),
         AlertUtils.makeVectorClock(result.getAllAnomalies()))
     );
+
+    LOG.info("Saving watermarks for alertConfigDAO : {}", 
alertConfig.toString());
     this.alertConfigDAO.save(alertConfig);
   }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
index 89db67332e..3b2044c464 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
@@ -33,6 +33,7 @@
 
 
 public abstract class StatefulDetectionAlertFilter extends 
DetectionAlertFilter {
+
   public StatefulDetectionAlertFilter(DataProvider provider, 
DetectionAlertConfigDTO config, long endTime) {
     super(provider, config, endTime);
   }
@@ -47,11 +48,16 @@ public DetectionAlertFilterResult run() throws Exception {
   protected final Set<MergedAnomalyResultDTO> filter(Map<Long, Long> 
vectorClocks, final long minId) {
     // retrieve all candidate anomalies
     Set<MergedAnomalyResultDTO> allAnomalies = new HashSet<>();
-    for (Long detectionConfigId : vectorClocks.keySet()) {
-      long startTime = vectorClocks.get(detectionConfigId);
+    for (Long functionId : vectorClocks.keySet()) {
+      long startTime = vectorClocks.get(functionId);
 
       AnomalySlice slice = new 
AnomalySlice().withStart(startTime).withEnd(this.endTime);
-      Collection<MergedAnomalyResultDTO> candidates = 
this.provider.fetchAnomalies(Collections.singletonList(slice), 
detectionConfigId).get(slice);
+      Collection<MergedAnomalyResultDTO> candidates;
+      if (this.config.isOnlyFetchLegacyAnomalies()) {
+        candidates = 
this.provider.fetchLegacyAnomalies(Collections.singletonList(slice), 
functionId).get(slice);
+      } else {
+        candidates = 
this.provider.fetchAnomalies(Collections.singletonList(slice), 
functionId).get(slice);
+      }
 
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new 
Predicate<MergedAnomalyResultDTO>() {
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
index 05b0454ec8..488f1f2748 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
@@ -70,13 +70,19 @@ public LegacyAlertFilter(DataProvider provider, 
DetectionAlertConfigDTO config,
   public DetectionAlertFilterResult run() {
     DetectionAlertFilterResult result = new DetectionAlertFilterResult();
 
-    for (Long detectionConfigId : this.detectionConfigIds) {
-      long startTime = MapUtils.getLong(this.vectorClocks, detectionConfigId, 
0L);
+    for (Long functionId : this.detectionConfigIds) {
+      long startTime = MapUtils.getLong(this.vectorClocks, functionId, 0L);
 
-      AnomalySlice slice =
-          new AnomalySlice().withStart(startTime).withEnd(this.endTime);
-      Collection<MergedAnomalyResultDTO> candidates =
-          this.provider.fetchAnomalies(Collections.singletonList(slice), 
detectionConfigId).get(slice);
+      AnomalySlice slice = new AnomalySlice()
+          .withStart(startTime)
+          .withEnd(this.endTime);
+
+      Collection<MergedAnomalyResultDTO> candidates;
+      if (this.config.isOnlyFetchLegacyAnomalies()) {
+        candidates = 
this.provider.fetchLegacyAnomalies(Collections.singletonList(slice), 
functionId).get(slice);
+      } else {
+        candidates = 
this.provider.fetchAnomalies(Collections.singletonList(slice), 
functionId).get(slice);
+      }
 
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new 
Predicate<MergedAnomalyResultDTO>() {
@@ -86,7 +92,11 @@ public boolean apply(@Nullable MergedAnomalyResultDTO 
mergedAnomaly) {
             }
           });
 
-      result.addMapping(this.alertConfig.getReceiverAddresses(), new 
HashSet<>(anomalies));
+      if (result.getResult().get(this.alertConfig.getReceiverAddresses()) == 
null) {
+        result.addMapping(this.alertConfig.getReceiverAddresses(), new 
HashSet<>(anomalies));
+      } else {
+        
result.getResult().get(this.alertConfig.getReceiverAddresses()).addAll(anomalies);
+      }
     }
 
     return result;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
index 58b8bdbf12..3f14d92cc9 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
@@ -35,6 +35,7 @@
  * The detection alert filter that sends the anomaly email to all recipients
  */
 public class ToAllRecipientsDetectionAlertFilter extends 
StatefulDetectionAlertFilter {
+
   private static final String PROP_RECIPIENTS = "recipients";
   private static final String PROP_TO = "to";
   private static final String PROP_CC = "cc";
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
index 97495f1f9d..5d763e2fad 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
@@ -176,7 +176,7 @@ private void 
generateAndSendEmails(DetectionAlertFilterResult detectionResult) t
   public void run() throws Exception {
     Preconditions.checkNotNull(result);
     if (result.getAllAnomalies().size() == 0) {
-      LOG.info("Zero anomalies found, skipping sending iris alert for {}", 
config.getId());
+      LOG.info("Zero anomalies found, skipping sending email alert for {}", 
config.getId());
       return;
     }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
index 49ac6b0076..bf4f37e0dd 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
@@ -26,13 +26,14 @@
 public class DetectionTestUtils {
   private static final Long PROP_ID_VALUE = 1000L;
 
-  public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, 
long end, String metric, String dataset, Map<String, String> dimensions) {
+  public static MergedAnomalyResultDTO makeAnomaly(Long configId, Long 
legacyFunctionId, long start, long end, String metric, String dataset, 
Map<String, String> dimensions) {
     MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
     anomaly.setDetectionConfigId(configId);
     anomaly.setStartTime(start);
     anomaly.setEndTime(end);
     anomaly.setMetric(metric);
     anomaly.setCollection(dataset);
+    anomaly.setFunctionId(legacyFunctionId);
 
     DimensionMap dimMap = new DimensionMap();
     dimMap.putAll(dimensions);
@@ -41,10 +42,18 @@ public static MergedAnomalyResultDTO makeAnomaly(Long 
configId, long start, long
     return anomaly;
   }
 
+  public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, 
long end, String metric, String dataset, Map<String, String> dimensions) {
+    return DetectionTestUtils.makeAnomaly(configId, null, start, end, metric, 
dataset, dimensions);
+  }
+
   public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
     return DetectionTestUtils.makeAnomaly(PROP_ID_VALUE, start, end, null, 
null, Collections.<String, String>emptyMap());
   }
 
+  public static MergedAnomalyResultDTO makeAnomaly(Long configId, Long 
legacyFuncId, long start, long end) {
+    return DetectionTestUtils.makeAnomaly(configId, legacyFuncId, start, end, 
null, null, Collections.<String, String>emptyMap());
+  }
+
   public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, 
long end) {
     return DetectionTestUtils.makeAnomaly(configId, start, end, null, null, 
Collections.<String, String>emptyMap());
   }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
index 084e4dc05d..02770d150f 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
@@ -123,14 +123,20 @@ public boolean apply(long... values) {
     return result;
   }
 
-  @Override
-  public Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+  private Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchAnomalies(Collection<AnomalySlice> slices,
+      long configId, boolean isLegacy) {
     Multimap<AnomalySlice, MergedAnomalyResultDTO> result = 
ArrayListMultimap.create();
     for (AnomalySlice slice : slices) {
       for (MergedAnomalyResultDTO anomaly : this.anomalies) {
         if (slice.match(anomaly)) {
-          if (configId >= 0 && (anomaly.getDetectionConfigId() == null || 
anomaly.getDetectionConfigId() != configId)){
-            continue;
+          if (isLegacy) {
+            if (configId >= 0 && (anomaly.getFunctionId() == null || 
anomaly.getFunctionId() != configId)) {
+              continue;
+            }
+          } else {
+            if (configId >= 0 && (anomaly.getDetectionConfigId() == null || 
anomaly.getDetectionConfigId() != configId)) {
+              continue;
+            }
           }
           result.put(slice, anomaly);
         }
@@ -139,6 +145,18 @@ public boolean apply(long... values) {
     return result;
   }
 
+  @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchLegacyAnomalies(Collection<AnomalySlice> slices,
+      long configId) {
+    return fetchAnomalies(slices, configId, true);
+  }
+
+  @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> 
fetchAnomalies(Collection<AnomalySlice> slices,
+      long configId) {
+    return fetchAnomalies(slices, configId, false);
+  }
+
   @Override
   public Multimap<EventSlice, EventDTO> fetchEvents(Collection<EventSlice> 
slices) {
     Multimap<EventSlice, EventDTO> result = ArrayListMultimap.create();
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
index ef81d6fa86..b96bfa14d9 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
@@ -65,9 +65,9 @@
   private DetectionAlertFilter alertFilter;
   private List<MergedAnomalyResultDTO> detectedAnomalies;
 
-  private Map<String, Object> properties;
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
+  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -81,25 +81,38 @@ public void beforeMethod() {
     this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999, 
Collections.singletonMap("key", "value")));
     this.detectedAnomalies.add(makeAnomaly(1003L,1111, 9999, 
Collections.singletonMap("key", "value")));
 
-    this.provider = new 
MockDataProvider().setAnomalies(this.detectedAnomalies);
+    // Anomalies generated by legacy pipeline
+    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
 
-    this.alertConfig = new DetectionAlertConfigDTO();
+    this.provider = new MockDataProvider()
+        .setAnomalies(this.detectedAnomalies);
 
-    this.properties = new HashMap<>();
+    this.alertConfig = createDetectionAlertConfig();
+    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
+    DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+    Map<String, Object> properties = new HashMap<>();
     Map<String, Set<String>> recipients = new HashMap<>();
     recipients.put(PROP_TO, PROP_TO_VALUE);
     recipients.put(PROP_CC, PROP_CC_VALUE);
     recipients.put(PROP_BCC, PROP_BCC_VALUE);
-    this.properties.put(PROP_RECIPIENTS, recipients);
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
-    this.properties.put(PROP_DIMENSION, PROP_DIMENSION_VALUE);
-    this.properties.put(PROP_DIMENSION_TO, PROP_DIMENSION_TO_VALUE);
+    properties.put(PROP_RECIPIENTS, recipients);
+    properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+    properties.put(PROP_DIMENSION, PROP_DIMENSION_VALUE);
+    properties.put(PROP_DIMENSION_TO, PROP_DIMENSION_TO_VALUE);
 
-    this.alertConfig.setProperties(this.properties);
+    alertConfig.setProperties(properties);
 
     Map<Long, Long> vectorClocks = new HashMap<>();
     vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
-    this.alertConfig.setVectorClocks(vectorClocks);
+    alertConfig.setVectorClocks(vectorClocks);
+
+    return alertConfig;
   }
 
   @Test
@@ -119,7 +132,7 @@ public void testAlertFilterRecipients() throws Exception {
 
   @Test
   public void testAlertFilterNoChildren() throws Exception {
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, 
Collections.singletonList(1003L));
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, 
Collections.singletonList(1003L));
     this.alertFilter = new DimensionDetectionAlertFilter(provider, 
alertConfig,2500L);
 
     MergedAnomalyResultDTO child = makeAnomaly(1003L, 1234, 9999);
@@ -136,7 +149,7 @@ public void testAlertFilterNoChildren() throws Exception {
 
   @Test
   public void testAlertFilterFeedback() throws Exception {
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, 
Collections.singletonList(1003L));
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, 
Collections.singletonList(1003L));
     this.alertFilter = new DimensionDetectionAlertFilter(provider, 
alertConfig,2500L);
 
     AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
index 1cd98a7093..6cff774ca9 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
@@ -52,6 +52,7 @@
 
   private List<MergedAnomalyResultDTO> detectedAnomalies;
   private LegacyAlertFilter legacyAlertFilter;
+  private LegacyAlertFilter legacyAlertFilterOnLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() throws Exception {
@@ -63,8 +64,23 @@ public void beforeMethod() throws Exception {
     this.detectedAnomalies.add(makeAnomaly(1002L, 3333, 9999));
     this.detectedAnomalies.add(makeAnomaly(1003L, 1100, 1500));
 
+    // Anomalies generated by legacy pipeline
+    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
+
+
     DataProvider mockDataProvider = new 
MockDataProvider().setAnomalies(this.detectedAnomalies);
 
+    DetectionAlertConfigDTO detectionAlertConfig = 
createDetectionAlertConfig();
+    this.legacyAlertFilter = new LegacyAlertFilter(mockDataProvider, 
detectionAlertConfig, 2500L);
+
+    DetectionAlertConfigDTO detectionAlertConfigLegacyAnomalies = 
createDetectionAlertConfig();
+    detectionAlertConfigLegacyAnomalies.setOnlyFetchLegacyAnomalies(true);
+    this.legacyAlertFilterOnLegacyAnomalies = new 
LegacyAlertFilter(mockDataProvider, detectionAlertConfigLegacyAnomalies, 2500L);
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
     DetectionAlertConfigDTO detectionAlertConfig = new 
DetectionAlertConfigDTO();
     Map<String, Object> properties = new HashMap<>();
     properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
@@ -74,10 +90,9 @@ public void beforeMethod() throws Exception {
     properties.put(PROP_LEGACY_ALERT_FILTER_CLASS_NAME, 
"com.linkedin.thirdeye.detector.email.filter.DummyAlertFilter");
     properties.put(PROP_LEGACY_ALERT_FILTER_CONFIG, "");
     detectionAlertConfig.setProperties(properties);
-
     detectionAlertConfig.setVectorClocks(new HashMap<Long, Long>());
 
-    this.legacyAlertFilter = new LegacyAlertFilter(mockDataProvider, 
detectionAlertConfig, 2500L);
+    return detectionAlertConfig;
   }
 
   @Test
@@ -86,4 +101,12 @@ public void testRun() throws Exception {
     Assert.assertEquals(result.getResult().get(RECEIVER_ADDRESSES),
         new HashSet<>(this.detectedAnomalies.subList(0, 4)));
   }
+
+  @Test
+  public void testFetchingLegacyAnomalies() throws Exception {
+    DetectionAlertFilterResult result = 
this.legacyAlertFilterOnLegacyAnomalies.run();
+    Assert.assertEquals(result.getAllAnomalies().size(), 2);
+    Assert.assertEquals(result.getResult().get(RECEIVER_ADDRESSES),
+        new HashSet<>(this.detectedAnomalies.subList(7, 9)));
+  }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
index 16dd1bb649..41b81e511f 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
@@ -58,9 +58,9 @@
   private DetectionAlertFilter alertFilter;
   private List<MergedAnomalyResultDTO> detectedAnomalies;
 
-  private Map<String, Object> properties;
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
+  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -72,23 +72,36 @@ public void beforeMethod() {
     this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999));
     this.detectedAnomalies.add(makeAnomaly(1003L,1100, 1500));
 
-    this.provider = new 
MockDataProvider().setAnomalies(this.detectedAnomalies);
+    // Anomalies generated by legacy pipeline
+    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
 
-    this.alertConfig = new DetectionAlertConfigDTO();
+    this.provider = new MockDataProvider()
+        .setAnomalies(this.detectedAnomalies);
 
-    this.properties = new HashMap<>();
+    this.alertConfig = createDetectionAlertConfig();
+    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
+    this.alertConfigForLegacyAnomalies.setOnlyFetchLegacyAnomalies(true);
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
+    DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+    Map<String, Object> properties = new HashMap<>();
     Map<String, Set<String>> recipients = new HashMap<>();
     recipients.put(PROP_TO, PROP_TO_VALUE);
     recipients.put(PROP_CC, PROP_CC_VALUE);
     recipients.put(PROP_BCC, PROP_BCC_VALUE);
-    this.properties.put(PROP_RECIPIENTS, recipients);
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+    properties.put(PROP_RECIPIENTS, recipients);
+    properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
 
-    this.alertConfig.setProperties(properties);
+    alertConfig.setProperties(properties);
     Map<Long, Long> vectorClocks = new HashMap<>();
     vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
-    this.alertConfig.setVectorClocks(vectorClocks);
+    alertConfig.setVectorClocks(vectorClocks);
 
+    return alertConfig;
   }
 
   @Test
@@ -99,9 +112,18 @@ public void testGetAlertFilterResult() throws Exception {
     Assert.assertEquals(result.getResult().get(RECIPIENTS), new 
HashSet<>(this.detectedAnomalies.subList(0, 4)));
   }
 
+  @Test
+  public void testGetAlertFilterResultForLegacyAnomalies() throws Exception {
+    this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider, 
this.alertConfigForLegacyAnomalies,2500L);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+    Assert.assertEquals(result.getResult().get(RECIPIENTS).size(), 2);
+    Assert.assertEquals(result.getResult().get(RECIPIENTS), new 
HashSet<>(this.detectedAnomalies.subList(7, 9)));
+  }
+
   @Test
   public void testAlertFilterFeedback() throws Exception {
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, 
Collections.singletonList(1003L));
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, 
Collections.singletonList(1003L));
     this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider, 
this.alertConfig,2500L);
 
     AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to