akshayrai closed pull request #3495: [TE] New detection alerter to support
alerting of legacy anomalies
URL: https://github.com/apache/incubator-pinot/pull/3495
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
index 5d4c825ba2..07984f265f 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
@@ -36,6 +36,7 @@
String from;
String cronExpression;
String application;
+ boolean onlyFetchLegacyAnomalies;
Map<String, Map<String, Object>> alertSchemes;
Map<String, Map<String, Object>> alertSuppressors;
@@ -48,6 +49,14 @@
Map<String, String> refLinks;
+ public boolean isOnlyFetchLegacyAnomalies() {
+ return onlyFetchLegacyAnomalies;
+ }
+
+ public void setOnlyFetchLegacyAnomalies(boolean onlyFetchLegacyAnomalies) {
+ this.onlyFetchLegacyAnomalies = onlyFetchLegacyAnomalies;
+ }
+
public boolean isActive() {
return active;
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
index 51e1a91d0d..c2e70e5ba0 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
@@ -65,6 +65,18 @@
*/
Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices,
List<String> dimensions);
+ /**
+ * Returns a multimap of anomalies (keyed by slice) generated by the legacy
detection pipeline.
+ *
+ * @see MergedAnomalyResultDTO
+ * @see AnomalySlice
+ *
+ * @param slices anomaly slice
+ * @param configId configId
+ * @return multimap of anomalies (keyed by slice)
+ */
+ Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchLegacyAnomalies(Collection<AnomalySlice> slices, long configId);
+
/**
* Returns a multimap of anomalies (keyed by slice) for a given set of
slices.
*
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index 4ea926b3b0..a13138d486 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -32,6 +32,7 @@
import com.linkedin.thirdeye.datalayer.util.Predicate;
import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
+import com.linkedin.thirdeye.detection.alert.StatefulDetectionAlertFilter;
import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import com.linkedin.thirdeye.detection.spi.model.EventSlice;
import java.util.ArrayList;
@@ -45,9 +46,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DefaultDataProvider implements DataProvider {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultDataProvider.class);
private static final long TIMEOUT = 60000;
private final ExecutorService executor = Executors.newCachedThreadPool();
@@ -122,39 +126,59 @@ public DataFrame call() throws Exception {
}
}
- @Override
- public Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+ private Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchAnomalies(Collection<AnomalySlice> slices,
+ long configId, boolean isLegacy) {
+ String functionIdKey = "detectionConfigId";
+ if (isLegacy) {
+ functionIdKey = "functionId";
+ }
+
Multimap<AnomalySlice, MergedAnomalyResultDTO> output =
ArrayListMultimap.create();
for (AnomalySlice slice : slices) {
List<Predicate> predicates = new ArrayList<>();
- if (slice.getEnd() >= 0)
+ if (slice.getEnd() >= 0) {
predicates.add(Predicate.LT("startTime", slice.getEnd()));
- if (slice.getStart() >= 0)
+ }
+ if (slice.getStart() >= 0) {
predicates.add(Predicate.GT("endTime", slice.getStart()));
- if (configId >= 0)
- predicates.add(Predicate.EQ("detectionConfigId", configId));
+ }
+ if (configId >= 0) {
+ predicates.add(Predicate.EQ(functionIdKey, configId));
+ }
- if (predicates.isEmpty())
- throw new IllegalArgumentException("Must provide at least one of
start, end, or detectionConfigId");
+ if (predicates.isEmpty()) throw new IllegalArgumentException("Must
provide at least one of start, end, or " + functionIdKey);
List<MergedAnomalyResultDTO> anomalies =
this.anomalyDAO.findByPredicate(AND(predicates));
- Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
- while (itAnomaly.hasNext()) {
- MergedAnomalyResultDTO anomaly = itAnomaly.next();
- if (configId >= 0 && (anomaly.getDetectionConfigId() == null ||
anomaly.getDetectionConfigId() != configId)){
- itAnomaly.remove();
- }
-
- if (!slice.match(anomaly)) {
- itAnomaly.remove();
- }
+ anomalies.removeIf(anomaly -> !slice.match(anomaly));
+
+ if (isLegacy) {
+ anomalies.removeIf(anomaly ->
+ (configId >= 0) && (anomaly.getFunctionId() == null ||
anomaly.getFunctionId() != configId)
+ );
+ } else {
+ anomalies.removeIf(anomaly ->
+ (configId >= 0) && (anomaly.getDetectionConfigId() == null ||
anomaly.getDetectionConfigId() != configId)
+ );
}
+ LOG.info("Fetched {} legacy anomalies between (startTime = {}, endTime =
{}) with confid Id = {}", anomalies.size(),
+ slice.getStart(), slice.getEnd(), configId);
output.putAll(slice, anomalies);
}
+
return output;
}
+ @Override
+ public Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchLegacyAnomalies(Collection<AnomalySlice> slices, long configId) {
+ return fetchAnomalies(slices, configId, true);
+ }
+
+ @Override
+ public Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+ return fetchAnomalies(slices, configId, false);
+ }
+
@Override
public Multimap<EventSlice, EventDTO> fetchEvents(Collection<EventSlice>
slices) {
Multimap<EventSlice, EventDTO> output = ArrayListMultimap.create();
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
index 3887ca64dc..5d655e2e0b 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
@@ -102,6 +102,11 @@ private static long
getLastTimeStamp(Collection<MergedAnomalyResultDTO> anomalie
@Nullable
@Override
public Long apply(@Nullable MergedAnomalyResultDTO
mergedAnomalyResultDTO) {
+ // Return functionId to support alerting of legacy anomalies
+ if (mergedAnomalyResultDTO.getDetectionConfigId() == null) {
+ return mergedAnomalyResultDTO.getFunctionId();
+ }
+
return mergedAnomalyResultDTO.getDetectionConfigId();
}
});
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 669b442334..50285cf6c3 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -85,6 +85,8 @@ private void
updateAlertConfigWatermarks(DetectionAlertFilterResult result, Dete
AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(),
AlertUtils.makeVectorClock(result.getAllAnomalies()))
);
+
+ LOG.info("Saving watermarks for alertConfigDAO : {}",
alertConfig.toString());
this.alertConfigDAO.save(alertConfig);
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
index 89db67332e..3b2044c464 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
@@ -33,6 +33,7 @@
public abstract class StatefulDetectionAlertFilter extends
DetectionAlertFilter {
+
public StatefulDetectionAlertFilter(DataProvider provider,
DetectionAlertConfigDTO config, long endTime) {
super(provider, config, endTime);
}
@@ -47,11 +48,16 @@ public DetectionAlertFilterResult run() throws Exception {
protected final Set<MergedAnomalyResultDTO> filter(Map<Long, Long>
vectorClocks, final long minId) {
// retrieve all candidate anomalies
Set<MergedAnomalyResultDTO> allAnomalies = new HashSet<>();
- for (Long detectionConfigId : vectorClocks.keySet()) {
- long startTime = vectorClocks.get(detectionConfigId);
+ for (Long functionId : vectorClocks.keySet()) {
+ long startTime = vectorClocks.get(functionId);
AnomalySlice slice = new
AnomalySlice().withStart(startTime).withEnd(this.endTime);
- Collection<MergedAnomalyResultDTO> candidates =
this.provider.fetchAnomalies(Collections.singletonList(slice),
detectionConfigId).get(slice);
+ Collection<MergedAnomalyResultDTO> candidates;
+ if (this.config.isOnlyFetchLegacyAnomalies()) {
+ candidates =
this.provider.fetchLegacyAnomalies(Collections.singletonList(slice),
functionId).get(slice);
+ } else {
+ candidates =
this.provider.fetchAnomalies(Collections.singletonList(slice),
functionId).get(slice);
+ }
Collection<MergedAnomalyResultDTO> anomalies =
Collections2.filter(candidates, new
Predicate<MergedAnomalyResultDTO>() {
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
index 05b0454ec8..488f1f2748 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
@@ -70,13 +70,19 @@ public LegacyAlertFilter(DataProvider provider,
DetectionAlertConfigDTO config,
public DetectionAlertFilterResult run() {
DetectionAlertFilterResult result = new DetectionAlertFilterResult();
- for (Long detectionConfigId : this.detectionConfigIds) {
- long startTime = MapUtils.getLong(this.vectorClocks, detectionConfigId,
0L);
+ for (Long functionId : this.detectionConfigIds) {
+ long startTime = MapUtils.getLong(this.vectorClocks, functionId, 0L);
- AnomalySlice slice =
- new AnomalySlice().withStart(startTime).withEnd(this.endTime);
- Collection<MergedAnomalyResultDTO> candidates =
- this.provider.fetchAnomalies(Collections.singletonList(slice),
detectionConfigId).get(slice);
+ AnomalySlice slice = new AnomalySlice()
+ .withStart(startTime)
+ .withEnd(this.endTime);
+
+ Collection<MergedAnomalyResultDTO> candidates;
+ if (this.config.isOnlyFetchLegacyAnomalies()) {
+ candidates =
this.provider.fetchLegacyAnomalies(Collections.singletonList(slice),
functionId).get(slice);
+ } else {
+ candidates =
this.provider.fetchAnomalies(Collections.singletonList(slice),
functionId).get(slice);
+ }
Collection<MergedAnomalyResultDTO> anomalies =
Collections2.filter(candidates, new
Predicate<MergedAnomalyResultDTO>() {
@@ -86,7 +92,11 @@ public boolean apply(@Nullable MergedAnomalyResultDTO
mergedAnomaly) {
}
});
- result.addMapping(this.alertConfig.getReceiverAddresses(), new
HashSet<>(anomalies));
+ if (result.getResult().get(this.alertConfig.getReceiverAddresses()) ==
null) {
+ result.addMapping(this.alertConfig.getReceiverAddresses(), new
HashSet<>(anomalies));
+ } else {
+
result.getResult().get(this.alertConfig.getReceiverAddresses()).addAll(anomalies);
+ }
}
return result;
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
index 58b8bdbf12..3f14d92cc9 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
@@ -35,6 +35,7 @@
* The detection alert filter that sends the anomaly email to all recipients
*/
public class ToAllRecipientsDetectionAlertFilter extends
StatefulDetectionAlertFilter {
+
private static final String PROP_RECIPIENTS = "recipients";
private static final String PROP_TO = "to";
private static final String PROP_CC = "cc";
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
index 97495f1f9d..5d763e2fad 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
@@ -176,7 +176,7 @@ private void
generateAndSendEmails(DetectionAlertFilterResult detectionResult) t
public void run() throws Exception {
Preconditions.checkNotNull(result);
if (result.getAllAnomalies().size() == 0) {
- LOG.info("Zero anomalies found, skipping sending iris alert for {}",
config.getId());
+ LOG.info("Zero anomalies found, skipping sending email alert for {}",
config.getId());
return;
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
index 49ac6b0076..bf4f37e0dd 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
@@ -26,13 +26,14 @@
public class DetectionTestUtils {
private static final Long PROP_ID_VALUE = 1000L;
- public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start,
long end, String metric, String dataset, Map<String, String> dimensions) {
+ public static MergedAnomalyResultDTO makeAnomaly(Long configId, Long
legacyFunctionId, long start, long end, String metric, String dataset,
Map<String, String> dimensions) {
MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
anomaly.setDetectionConfigId(configId);
anomaly.setStartTime(start);
anomaly.setEndTime(end);
anomaly.setMetric(metric);
anomaly.setCollection(dataset);
+ anomaly.setFunctionId(legacyFunctionId);
DimensionMap dimMap = new DimensionMap();
dimMap.putAll(dimensions);
@@ -41,10 +42,18 @@ public static MergedAnomalyResultDTO makeAnomaly(Long
configId, long start, long
return anomaly;
}
+ public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start,
long end, String metric, String dataset, Map<String, String> dimensions) {
+ return DetectionTestUtils.makeAnomaly(configId, null, start, end, metric,
dataset, dimensions);
+ }
+
public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
return DetectionTestUtils.makeAnomaly(PROP_ID_VALUE, start, end, null,
null, Collections.<String, String>emptyMap());
}
+ public static MergedAnomalyResultDTO makeAnomaly(Long configId, Long
legacyFuncId, long start, long end) {
+ return DetectionTestUtils.makeAnomaly(configId, legacyFuncId, start, end,
null, null, Collections.<String, String>emptyMap());
+ }
+
public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start,
long end) {
return DetectionTestUtils.makeAnomaly(configId, start, end, null, null,
Collections.<String, String>emptyMap());
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
index 084e4dc05d..02770d150f 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
@@ -123,14 +123,20 @@ public boolean apply(long... values) {
return result;
}
- @Override
- public Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+ private Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchAnomalies(Collection<AnomalySlice> slices,
+ long configId, boolean isLegacy) {
Multimap<AnomalySlice, MergedAnomalyResultDTO> result =
ArrayListMultimap.create();
for (AnomalySlice slice : slices) {
for (MergedAnomalyResultDTO anomaly : this.anomalies) {
if (slice.match(anomaly)) {
- if (configId >= 0 && (anomaly.getDetectionConfigId() == null ||
anomaly.getDetectionConfigId() != configId)){
- continue;
+ if (isLegacy) {
+ if (configId >= 0 && (anomaly.getFunctionId() == null ||
anomaly.getFunctionId() != configId)) {
+ continue;
+ }
+ } else {
+ if (configId >= 0 && (anomaly.getDetectionConfigId() == null ||
anomaly.getDetectionConfigId() != configId)) {
+ continue;
+ }
}
result.put(slice, anomaly);
}
@@ -139,6 +145,18 @@ public boolean apply(long... values) {
return result;
}
+ @Override
+ public Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchLegacyAnomalies(Collection<AnomalySlice> slices,
+ long configId) {
+ return fetchAnomalies(slices, configId, true);
+ }
+
+ @Override
+ public Multimap<AnomalySlice, MergedAnomalyResultDTO>
fetchAnomalies(Collection<AnomalySlice> slices,
+ long configId) {
+ return fetchAnomalies(slices, configId, false);
+ }
+
@Override
public Multimap<EventSlice, EventDTO> fetchEvents(Collection<EventSlice>
slices) {
Multimap<EventSlice, EventDTO> result = ArrayListMultimap.create();
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
index ef81d6fa86..b96bfa14d9 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
@@ -65,9 +65,9 @@
private DetectionAlertFilter alertFilter;
private List<MergedAnomalyResultDTO> detectedAnomalies;
- private Map<String, Object> properties;
private MockDataProvider provider;
private DetectionAlertConfigDTO alertConfig;
+ private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
@BeforeMethod
public void beforeMethod() {
@@ -81,25 +81,38 @@ public void beforeMethod() {
this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999,
Collections.singletonMap("key", "value")));
this.detectedAnomalies.add(makeAnomaly(1003L,1111, 9999,
Collections.singletonMap("key", "value")));
- this.provider = new
MockDataProvider().setAnomalies(this.detectedAnomalies);
+ // Anomalies generated by legacy pipeline
+ this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+ this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+ this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
- this.alertConfig = new DetectionAlertConfigDTO();
+ this.provider = new MockDataProvider()
+ .setAnomalies(this.detectedAnomalies);
- this.properties = new HashMap<>();
+ this.alertConfig = createDetectionAlertConfig();
+ this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
+ }
+
+ private DetectionAlertConfigDTO createDetectionAlertConfig() {
+ DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+ Map<String, Object> properties = new HashMap<>();
Map<String, Set<String>> recipients = new HashMap<>();
recipients.put(PROP_TO, PROP_TO_VALUE);
recipients.put(PROP_CC, PROP_CC_VALUE);
recipients.put(PROP_BCC, PROP_BCC_VALUE);
- this.properties.put(PROP_RECIPIENTS, recipients);
- this.properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
- this.properties.put(PROP_DIMENSION, PROP_DIMENSION_VALUE);
- this.properties.put(PROP_DIMENSION_TO, PROP_DIMENSION_TO_VALUE);
+ properties.put(PROP_RECIPIENTS, recipients);
+ properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+ properties.put(PROP_DIMENSION, PROP_DIMENSION_VALUE);
+ properties.put(PROP_DIMENSION_TO, PROP_DIMENSION_TO_VALUE);
- this.alertConfig.setProperties(this.properties);
+ alertConfig.setProperties(properties);
Map<Long, Long> vectorClocks = new HashMap<>();
vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
- this.alertConfig.setVectorClocks(vectorClocks);
+ alertConfig.setVectorClocks(vectorClocks);
+
+ return alertConfig;
}
@Test
@@ -119,7 +132,7 @@ public void testAlertFilterRecipients() throws Exception {
@Test
public void testAlertFilterNoChildren() throws Exception {
- this.properties.put(PROP_DETECTION_CONFIG_IDS,
Collections.singletonList(1003L));
+ this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS,
Collections.singletonList(1003L));
this.alertFilter = new DimensionDetectionAlertFilter(provider,
alertConfig,2500L);
MergedAnomalyResultDTO child = makeAnomaly(1003L, 1234, 9999);
@@ -136,7 +149,7 @@ public void testAlertFilterNoChildren() throws Exception {
@Test
public void testAlertFilterFeedback() throws Exception {
- this.properties.put(PROP_DETECTION_CONFIG_IDS,
Collections.singletonList(1003L));
+ this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS,
Collections.singletonList(1003L));
this.alertFilter = new DimensionDetectionAlertFilter(provider,
alertConfig,2500L);
AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
index 1cd98a7093..6cff774ca9 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
@@ -52,6 +52,7 @@
private List<MergedAnomalyResultDTO> detectedAnomalies;
private LegacyAlertFilter legacyAlertFilter;
+ private LegacyAlertFilter legacyAlertFilterOnLegacyAnomalies;
@BeforeMethod
public void beforeMethod() throws Exception {
@@ -63,8 +64,23 @@ public void beforeMethod() throws Exception {
this.detectedAnomalies.add(makeAnomaly(1002L, 3333, 9999));
this.detectedAnomalies.add(makeAnomaly(1003L, 1100, 1500));
+ // Anomalies generated by legacy pipeline
+ this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+ this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+ this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
+
+
DataProvider mockDataProvider = new
MockDataProvider().setAnomalies(this.detectedAnomalies);
+ DetectionAlertConfigDTO detectionAlertConfig =
createDetectionAlertConfig();
+ this.legacyAlertFilter = new LegacyAlertFilter(mockDataProvider,
detectionAlertConfig, 2500L);
+
+ DetectionAlertConfigDTO detectionAlertConfigLegacyAnomalies =
createDetectionAlertConfig();
+ detectionAlertConfigLegacyAnomalies.setOnlyFetchLegacyAnomalies(true);
+ this.legacyAlertFilterOnLegacyAnomalies = new
LegacyAlertFilter(mockDataProvider, detectionAlertConfigLegacyAnomalies, 2500L);
+ }
+
+ private DetectionAlertConfigDTO createDetectionAlertConfig() {
DetectionAlertConfigDTO detectionAlertConfig = new
DetectionAlertConfigDTO();
Map<String, Object> properties = new HashMap<>();
properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
@@ -74,10 +90,9 @@ public void beforeMethod() throws Exception {
properties.put(PROP_LEGACY_ALERT_FILTER_CLASS_NAME,
"com.linkedin.thirdeye.detector.email.filter.DummyAlertFilter");
properties.put(PROP_LEGACY_ALERT_FILTER_CONFIG, "");
detectionAlertConfig.setProperties(properties);
-
detectionAlertConfig.setVectorClocks(new HashMap<Long, Long>());
- this.legacyAlertFilter = new LegacyAlertFilter(mockDataProvider,
detectionAlertConfig, 2500L);
+ return detectionAlertConfig;
}
@Test
@@ -86,4 +101,12 @@ public void testRun() throws Exception {
Assert.assertEquals(result.getResult().get(RECEIVER_ADDRESSES),
new HashSet<>(this.detectedAnomalies.subList(0, 4)));
}
+
+ @Test
+ public void testFetchingLegacyAnomalies() throws Exception {
+ DetectionAlertFilterResult result =
this.legacyAlertFilterOnLegacyAnomalies.run();
+ Assert.assertEquals(result.getAllAnomalies().size(), 2);
+ Assert.assertEquals(result.getResult().get(RECEIVER_ADDRESSES),
+ new HashSet<>(this.detectedAnomalies.subList(7, 9)));
+ }
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
index 16dd1bb649..41b81e511f 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
@@ -58,9 +58,9 @@
private DetectionAlertFilter alertFilter;
private List<MergedAnomalyResultDTO> detectedAnomalies;
- private Map<String, Object> properties;
private MockDataProvider provider;
private DetectionAlertConfigDTO alertConfig;
+ private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
@BeforeMethod
public void beforeMethod() {
@@ -72,23 +72,36 @@ public void beforeMethod() {
this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999));
this.detectedAnomalies.add(makeAnomaly(1003L,1100, 1500));
- this.provider = new
MockDataProvider().setAnomalies(this.detectedAnomalies);
+ // Anomalies generated by legacy pipeline
+ this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+ this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+ this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
- this.alertConfig = new DetectionAlertConfigDTO();
+ this.provider = new MockDataProvider()
+ .setAnomalies(this.detectedAnomalies);
- this.properties = new HashMap<>();
+ this.alertConfig = createDetectionAlertConfig();
+ this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
+ this.alertConfigForLegacyAnomalies.setOnlyFetchLegacyAnomalies(true);
+ }
+
+ private DetectionAlertConfigDTO createDetectionAlertConfig() {
+ DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+ Map<String, Object> properties = new HashMap<>();
Map<String, Set<String>> recipients = new HashMap<>();
recipients.put(PROP_TO, PROP_TO_VALUE);
recipients.put(PROP_CC, PROP_CC_VALUE);
recipients.put(PROP_BCC, PROP_BCC_VALUE);
- this.properties.put(PROP_RECIPIENTS, recipients);
- this.properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+ properties.put(PROP_RECIPIENTS, recipients);
+ properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
- this.alertConfig.setProperties(properties);
+ alertConfig.setProperties(properties);
Map<Long, Long> vectorClocks = new HashMap<>();
vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
- this.alertConfig.setVectorClocks(vectorClocks);
+ alertConfig.setVectorClocks(vectorClocks);
+ return alertConfig;
}
@Test
@@ -99,9 +112,18 @@ public void testGetAlertFilterResult() throws Exception {
Assert.assertEquals(result.getResult().get(RECIPIENTS), new
HashSet<>(this.detectedAnomalies.subList(0, 4)));
}
+ @Test
+ public void testGetAlertFilterResultForLegacyAnomalies() throws Exception {
+ this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider,
this.alertConfigForLegacyAnomalies,2500L);
+
+ DetectionAlertFilterResult result = this.alertFilter.run();
+ Assert.assertEquals(result.getResult().get(RECIPIENTS).size(), 2);
+ Assert.assertEquals(result.getResult().get(RECIPIENTS), new
HashSet<>(this.detectedAnomalies.subList(7, 9)));
+ }
+
@Test
public void testAlertFilterFeedback() throws Exception {
- this.properties.put(PROP_DETECTION_CONFIG_IDS,
Collections.singletonList(1003L));
+ this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS,
Collections.singletonList(1003L));
this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider,
this.alertConfig,2500L);
AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]