jihaozh closed pull request #3586: [TE] detection pipeline - multiple
improvements
URL: https://github.com/apache/incubator-pinot/pull/3586
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
index 57d7fb5641..30be06f65d 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
@@ -259,7 +259,7 @@ private void storeData(AnomalyDetectionOutputContext
anomalyDetectionOutputConte
historyMergedAnomalies = Collections.emptyList();
}
- LOG.info("Analyzing anomaly function with explored dimensions: {},
windowStart: {}, windowEnd: {}",
+ LOG.info("[Old pipeline] Analyzing anomaly function with explored
dimensions: {}, windowStart: {}, windowEnd: {}",
dimensionMap, windowStart, windowEnd);
AnomalyUtils.logAnomaliesOverlapWithWindow(windowStart, windowEnd,
historyMergedAnomalies);
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 42cbc6205f..aad718a043 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -405,6 +405,9 @@ public MergedAnomalyResultBean
convertMergeAnomalyDTO2Bean(MergedAnomalyResultDT
for (MergedAnomalyResultDTO child : entity.getChildren()) {
if (child.getId() == null) {
// only allow single level to prevent cycles
+ if (child == entity){
+ throw new IllegalArgumentException("Cannot contain itself as child
anomaly");
+ }
if (child.getChildren() != null && !child.getChildren().isEmpty()) {
throw new IllegalArgumentException("Multi-level anomaly nesting
not supported");
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index d4035aaf8e..c53af497b9 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -28,6 +28,7 @@
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -59,6 +60,8 @@
private static final Logger LOGGER =
LoggerFactory.getLogger(DetectionMigrationResource.class);
private static final String PROP_WINDOW_DELAY = "windowDelay";
private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+ private static final String PROP_WINDOW_SIZE = "windowSize";
+ private static final String PROP_WINDOW_UNIT = "windowUnit";
private final LegacyAnomalyFunctionTranslator translator;
private final AnomalyFunctionManager anomalyFunctionDAO;
@@ -115,8 +118,21 @@ public String migrateToYaml(@QueryParam("id") long
anomalyFunctionId) throws Exc
ruleYaml.put("name", "myRule");
// detection
- ruleYaml.put("detection", Collections.singletonList(
- ImmutableMap.of("type", "ALGORITHM", "params",
getAlgorithmDetectorParams(anomalyFunctionDTO))));
+ if (anomalyFunctionDTO.getType().equals("WEEK_OVER_WEEK_RULE")){
+ // wo1w change detector
+ ruleYaml.put("detection",
Collections.singletonList(ImmutableMap.of("type", "PERCENTAGE_RULE",
+ "params",
getPercentageChangeRuleDetectorParams(anomalyFunctionDTO))));
+ } else if (anomalyFunctionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+ // threshold detector
+ ruleYaml.put("detection",
Collections.singletonList(ImmutableMap.of("type", "THRESHOLD",
+ "params",
getMinMaxThresholdRuleDetectorParams(anomalyFunctionDTO))));
+ } else{
+ // algorithm detector
+ ruleYaml.put("detection", Collections.singletonList(
+ ImmutableMap.of("type", "ALGORITHM", "params",
getAlgorithmDetectorParams(anomalyFunctionDTO),
+ PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize(),
+ PROP_WINDOW_UNIT,
anomalyFunctionDTO.getWindowUnit().toString())));
+ }
// filters
Map<String, String> alertFilter = anomalyFunctionDTO.getAlertFilter();
@@ -157,7 +173,7 @@ public String migrateToYaml(@QueryParam("id") long
anomalyFunctionId) throws Exc
return this.yaml.dump(yamlConfigs);
}
- private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO
functionDTO) {
+ private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO
functionDTO) throws IOException {
Map<String, Object> dimensionExploreYaml = new LinkedHashMap<>();
dimensionExploreYaml.put("dimensions",
Collections.singletonList(functionDTO.getExploreDimensions()));
if (functionDTO.getDataFilter() != null &&
!functionDTO.getDataFilter().isEmpty() &&
functionDTO.getDataFilter().get("type").equals("average_threshold")) {
@@ -166,6 +182,13 @@ public String migrateToYaml(@QueryParam("id") long
anomalyFunctionId) throws Exc
dimensionExploreYaml.put("minValue",
Double.valueOf(functionDTO.getDataFilter().get("threshold")));
dimensionExploreYaml.put("minLiveZone",
functionDTO.getDataFilter().get("minLiveZone"));
}
+ if (functionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+ // migrate volume threshold
+ Properties properties =
AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+ if (properties.containsKey("averageVolumeThreshold")){
+ dimensionExploreYaml.put("minValue",
properties.getProperty("averageVolumeThreshold"));
+ }
+ }
return dimensionExploreYaml;
}
@@ -208,6 +231,33 @@ private String getBucketPeriod(AnomalyFunctionDTO
functionDTO) {
return new
Period(TimeUnit.MILLISECONDS.convert(functionDTO.getBucketSize(),
functionDTO.getBucketUnit())).toString();
}
+ private Map<String, Object>
getPercentageChangeRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws
IOException {
+ Map<String, Object> detectorYaml = new LinkedHashMap<>();
+ Properties properties =
AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+ double threshold =
Double.valueOf(properties.getProperty("changeThreshold"));
+ if (properties.containsKey("changeThreshold")){
+ detectorYaml.put("percentageChange", Math.abs(threshold));
+ if (threshold > 0){
+ detectorYaml.put("pattern", "UP");
+ } else {
+ detectorYaml.put("pattern", "DOWN");
+ }
+ }
+ return detectorYaml;
+ }
+
+ private Map<String, Object>
getMinMaxThresholdRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws
IOException {
+ Map<String, Object> detectorYaml = new LinkedHashMap<>();
+ Properties properties =
AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+ if (properties.containsKey("min")){
+ detectorYaml.put("min", properties.getProperty("min"));
+ }
+ if (properties.containsKey("max")){
+ detectorYaml.put("max", properties.getProperty("max"));
+ }
+ return detectorYaml;
+ }
+
private Map<String, Object> getAlgorithmDetectorParams(AnomalyFunctionDTO
functionDTO) throws Exception {
Map<String, Object> detectorYaml = new LinkedHashMap<>();
Map<String, Object> params = new LinkedHashMap<>();
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index a51a3a62c7..dd63d06db5 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -17,6 +17,7 @@
package com.linkedin.thirdeye.detection;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
import com.linkedin.thirdeye.constant.AnomalyResultSource;
import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
@@ -35,9 +36,11 @@
import com.linkedin.thirdeye.detection.finetune.TuningAlgorithm;
import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import com.wordnik.swagger.annotations.ApiParam;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -51,7 +54,9 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.MapUtils;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
+import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +66,9 @@
public class DetectionResource {
private static final Logger LOG =
LoggerFactory.getLogger(DetectionResource.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final String DAILY_CRON = "0 0 14 * * ? *";
+ private static final String HOURLY_CRON = "0 0 * * * ? *";
+ private static final String MINUTE_CRON = "0 0/15 * * * ? *";
private final MetricConfigManager metricDAO;
private final DatasetConfigManager datasetDAO;
@@ -164,11 +172,102 @@ public Response detectionPreview(
return Response.ok(result).build();
}
+ // return default bucket size based on cron schedule.
+ private long getBucketSize(DetectionConfigDTO config){
+ switch (config.getCron()) {
+ case DAILY_CRON:
+ // daily
+ return TimeUnit.DAYS.toMillis(1);
+ case MINUTE_CRON:
+ // minute level
+ return TimeUnit.MINUTES.toMillis(15);
+ case HOURLY_CRON:
+ // hourly
+ return TimeUnit.HOURS.toMillis(1);
+ }
+ throw new IllegalArgumentException("bucket size not determined");
+ }
+
+ // return default window size based on cron schedule.
+ private long getWindowSize(DetectionConfigDTO config) {
+ switch (config.getCron()) {
+ case DAILY_CRON:
+ // daily
+ return TimeUnit.DAYS.toMillis(1);
+ case MINUTE_CRON:
+ // minute level
+ return TimeUnit.HOURS.toMillis(6);
+ case HOURLY_CRON:
+ // hourly
+ return TimeUnit.HOURS.toMillis(24);
+ }
+ throw new IllegalArgumentException("window size not determined");
+ }
+
+ /*
+ Generates monitoring window based on cron schedule.
+ */
+ private List<Interval> getReplayMonitoringWindows(DetectionConfigDTO config,
long start, long end, Long windowSize, Long bucketSize) throws ParseException {
+ List<Interval> monitoringWindows = new ArrayList<>();
+ CronExpression cronExpression = new CronExpression(config.getCron());
+ DateTime currentStart = new DateTime(start);
+
+ long legacyWindowSize;
+ if(windowSize == null){
+ legacyWindowSize = getWindowSize(config);
+ LOG.warn("[Legacy replay] window size not set when replay {}. Use
default window size {}", config.getId(), legacyWindowSize);
+ } else {
+ legacyWindowSize = windowSize;
+ }
+
+ long legacyBucketSize;
+ if (bucketSize == null){
+ legacyBucketSize = getBucketSize(config);
+ LOG.warn("[Legacy replay] bucket size not set when replay {}. Use
default bucket size {}", config.getId(), legacyBucketSize);
+ } else {
+ legacyBucketSize = bucketSize;
+ }
+
+ // add offsets to that it would replay all the moving windows within start
time and end time
+ currentStart = currentStart.plus(legacyWindowSize).minus(legacyBucketSize);
+ if (config.getCron().equals(DAILY_CRON)){
+ // daily detection offset by 1 to pick up the first moving window
+ currentStart.minus(1L);
+ }
+
+ DateTime currentEnd = new
DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+ DateTime endBoundary = new
DateTime(cronExpression.getNextValidTimeAfter(new Date(end)));
+ while (currentEnd.isBefore(endBoundary)) {
+ monitoringWindows.add(new Interval(currentStart.getMillis(),
currentEnd.getMillis()));
+ currentStart = currentEnd;
+ currentEnd = new
DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+ }
+ return monitoringWindows;
+ }
+
+ /**
+ * Legacy replay endpoint. Replay all the moving windows within start time
and end time.
+ * Saves anomaly for each moving window before starting detection for next
window.
+ * Behaves exactly like the legacy replay endpoint.
+ * See also {@link
com.linkedin.thirdeye.dashboard.resources.DetectionJobResource#generateAnomaliesInRangeForFunctions(String,
String, String, String, Boolean, Boolean)}}
+ * @param configId the config id to replay
+ * @param start start time
+ * @param end end time
+ * @param deleteExistingAnomaly (optional) delete existing anomaly or not
+ * @param windowSize (optional) override the default window size
+ * @param bucketSize (optional) override the default window size
+ * @return anomalies
+ * @throws Exception
+ */
@POST
- @Path("/replay/{id}")
- public Response detectionReplay(@PathParam("id") long configId,
@QueryParam("start") long start,
+ @Path("/legacy-replay/{id}")
+ public Response legacyReplay(
+ @PathParam("id") long configId,
+ @QueryParam("start") long start,
@QueryParam("end") long end,
- @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean
deleteExistingAnomaly) throws Exception {
+ @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean
deleteExistingAnomaly,
+ @QueryParam("windowSize") Long windowSize,
+ @QueryParam("bucketSize") Long bucketSize) throws Exception {
DetectionConfigDTO config = this.configDAO.findById(configId);
if (config == null) {
@@ -188,6 +287,57 @@ public Response detectionReplay(@PathParam("id") long
configId, @QueryParam("sta
this.anomalyDAO.deleteByIds(existingIds);
}
+ // execute replay
+ List<Interval> monitoringWindows = getReplayMonitoringWindows(config,
start, end, windowSize, bucketSize);
+ for (Interval monitoringWindow : monitoringWindows){
+ DetectionPipeline pipeline = this.loader.from(this.provider, config,
monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
+ DetectionPipelineResult result = pipeline.run();
+
+ // save state
+ if (result.getLastTimestamp() > 0) {
+ config.setLastTimestamp(result.getLastTimestamp());
+ }
+
+ this.configDAO.update(config);
+
+ for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+ anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+ this.anomalyDAO.save(anomaly);
+ }
+ }
+
+ Collection<MergedAnomalyResultDTO> replayResult =
this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ LOG.info("replay detection pipeline {} generated {} anomalies.",
config.getId(), replayResult.size());
+ return Response.ok(replayResult).build();
+ }
+
+ /**
+ * Replay for a given time range. Without cron schedule behavior
+ */
+ @POST
+ @Path("/replay/{id}")
+ public Response detectionReplay(
+ @PathParam("id") long configId,
+ @QueryParam("start") long start,
+ @QueryParam("end") long end) throws Exception {
+
+ DetectionConfigDTO config = this.configDAO.findById(configId);
+ if (config == null) {
+ throw new IllegalArgumentException(String.format("Cannot find config
%d", configId));
+ }
+
+ // clear existing anomalies
+ AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+ Collection<MergedAnomalyResultDTO> existing =
this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
+
+ this.anomalyDAO.deleteByIds(existingIds);
+
// execute replay
DetectionPipeline pipeline = this.loader.from(this.provider, config,
start, end);
DetectionPipelineResult result = pipeline.run();
@@ -204,7 +354,6 @@ public Response detectionReplay(@PathParam("id") long
configId, @QueryParam("sta
this.anomalyDAO.save(anomaly);
}
- LOG.info("replay detection pipeline {} generated {} anomalies.",
config.getId(), result.getAnomalies().size());
- return Response.ok(result.getAnomalies()).build();
+ return Response.ok(result).build();
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
index f1bf20dc79..5f80b10de4 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
@@ -65,8 +65,10 @@ public static boolean isReferenceName(String key) {
}
// get the component name from the reference key
+ // example "$myRule:ALGORITHM:0" -> "myRule:ALGORITHM:0"
public static String getComponentName(String key) {
- return key.substring(1);
+ if (isReferenceName(key)) return key.substring(1);
+ else throw new IllegalArgumentException("not a component reference key.
should starts with $");
}
// get the spec class name for a component class
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 43f9c27cea..19b13fb765 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -46,7 +46,7 @@
private static final String PROP_NESTED = "nested";
private static final String PROP_CLASS_NAME = "className";
private static final String PROP_MERGE_KEY = "mergeKey";
- private static final String PROP_DETECTOR_COMPONENT_KEY =
"detectorComponentKey";
+ private static final String PROP_DETECTOR_COMPONENT_NAME =
"detectorComponentName";
protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new
Comparator<MergedAnomalyResultDTO>() {
@Override
@@ -67,7 +67,7 @@ public int compare(MergedAnomalyResultDTO o1,
MergedAnomalyResultDTO o2) {
protected final List<Map<String, Object>> nestedProperties;
protected final long maxGap; // max time gap for merge
protected final long maxDuration; // max overall duration of merged anomaly
- private final AnomalySlice slice;
+ protected final AnomalySlice slice;
/**
* Instantiates a new merge wrapper.
@@ -181,7 +181,7 @@ public DetectionPipelineResult run() throws Exception {
return output;
}
- private long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+ protected long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
long time = this.startTime;
for (MergedAnomalyResultDTO anomaly : anomalies) {
time = Math.min(anomaly.getStartTime(), time);
@@ -189,7 +189,7 @@ private long getStartTime(Iterable<MergedAnomalyResultDTO>
anomalies) {
return time;
}
- private long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+ protected long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
long time = this.endTime;
for (MergedAnomalyResultDTO anomaly : anomalies) {
time = Math.max(anomaly.getEndTime(), time);
@@ -214,7 +214,7 @@ public AnomalyKey(String metric, String collection,
DimensionMap dimensions, Str
public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(),
anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
- anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
+ anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_NAME));
}
@Override
@@ -227,12 +227,13 @@ public boolean equals(Object o) {
}
AnomalyKey that = (AnomalyKey) o;
return Objects.equals(metric, that.metric) && Objects.equals(collection,
that.collection) && Objects.equals(
- dimensions, that.dimensions) && Objects.equals(mergeKey,
that.mergeKey);
+ dimensions, that.dimensions) && Objects.equals(mergeKey,
that.mergeKey) && Objects.equals(componentKey,
+ that.componentKey);
}
@Override
public int hashCode() {
- return Objects.hash(metric, collection, dimensions, mergeKey);
+ return Objects.hash(metric, collection, dimensions, mergeKey,
componentKey);
}
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index a650550897..0374892d1a 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -18,8 +18,10 @@
import com.google.common.base.Preconditions;
import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyResult;
import com.linkedin.thirdeye.api.TimeGranularity;
import com.linkedin.thirdeye.api.TimeSpec;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -61,7 +63,8 @@
private static final String PROP_WINDOW_UNIT = "windowUnit";
private static final String PROP_FREQUENCY = "frequency";
private static final String PROP_DETECTOR = "detector";
- private static final String PROP_DETECTOR_COMPONENT_KEY =
"detectorComponentKey";
+ private static final String PROP_DETECTOR_COMPONENT_NAME =
"detectorComponentName";
+ private static final String PROP_TIMEZONE = "timezone";
private static final Logger LOG = LoggerFactory.getLogger(
AnomalyDetectorWrapper.class);
@@ -76,11 +79,13 @@
private final MetricConfigDTO metric;
private final MetricEntity metricEntity;
private final boolean isMovingWindowDetection;
- private DatasetConfigDTO dataset;
- private DateTimeZone dateTimeZone;
// need to specify run frequency for minute level detection. Used for moving
monitoring window alignment, default to be 15 minutes.
private final TimeGranularity functionFrequency;
- private final String detectorReferenceKey;
+ private final String detectorName;
+ private final long windowSizeMillis;
+ private final DatasetConfigDTO dataset;
+ private final DateTimeZone dateTimeZone;
+
public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO
config, long startTime, long endTime) {
super(provider, config, startTime, endTime);
@@ -90,19 +95,30 @@ public AnomalyDetectorWrapper(DataProvider provider,
DetectionConfigDTO config,
this.metric =
provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
- this.detectorReferenceKey =
DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(),
PROP_DETECTOR));
-
Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorReferenceKey));
- this.anomalyDetector = (AnomalyDetector)
this.config.getComponents().get(this.detectorReferenceKey);
+ this.detectorName =
DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(),
PROP_DETECTOR));
+
Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorName));
+ this.anomalyDetector = (AnomalyDetector)
this.config.getComponents().get(this.detectorName);
+ // emulate moving window or now
this.isMovingWindowDetection =
MapUtils.getBooleanValue(config.getProperties(), PROP_MOVING_WINDOW_DETECTION,
false);
// delays to wait for data becomes available
this.windowDelay = MapUtils.getIntValue(config.getProperties(),
PROP_WINDOW_DELAY, 0);
+ // window delay unit
this.windowDelayUnit =
TimeUnit.valueOf(MapUtils.getString(config.getProperties(),
PROP_WINDOW_DELAY_UNIT, "DAYS"));
// detection window size
this.windowSize = MapUtils.getIntValue(config.getProperties(),
PROP_WINDOW_SIZE, 1);
this.windowUnit =
TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_UNIT,
"DAYS"));
+ this.windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize,
windowUnit);
+ // run frequency, used to determine moving windows for minute-level
detection
Map<String, Object> frequency = MapUtils.getMap(config.getProperties(),
PROP_FREQUENCY, Collections.emptyMap());
this.functionFrequency = new
TimeGranularity(MapUtils.getIntValue(frequency, "size", 15),
TimeUnit.valueOf(MapUtils.getString(frequency, "unit", "MINUTES")));
+
+ MetricEntity me = MetricEntity.fromURN(this.metricUrn);
+ MetricConfigDTO metricConfigDTO =
this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
+ this.dataset =
this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
+ .get(metricConfigDTO.getDataset());
+ // date time zone for moving windows. use dataset time zone as default
+ this.dateTimeZone =
DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE,
"America/Los_Angeles"));
}
@Override
@@ -112,6 +128,7 @@ public DetectionPipelineResult run() throws Exception {
for (Interval window : monitoringWindows) {
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
+ LOG.info("[New Pipeline] running detection for config {} metricUrn {}.
start time {}, end time{}", config.getId(), metricUrn, window.getStart(),
window.getEnd());
anomaliesForOneWindow = anomalyDetector.runDetection(window,
this.metricUrn);
} catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to
{} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
@@ -125,27 +142,25 @@ public DetectionPipelineResult run() throws Exception {
anomaly.setMetric(this.metric.getName());
anomaly.setCollection(this.metric.getDataset());
anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
- anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_KEY,
this.detectorReferenceKey);
+ anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME,
this.detectorName);
}
return new DetectionPipelineResult(anomalies);
}
+ // get a list of the monitoring window, if no sliding window used, use start
time and end time as window
List<Interval> getMonitoringWindows() {
if (this.isMovingWindowDetection) {
try{
List<Interval> monitoringWindows = new ArrayList<>();
- MetricEntity me = MetricEntity.fromURN(this.metricUrn);
- MetricConfigDTO metricConfigDTO =
-
this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
- dataset =
this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
- .get(metricConfigDTO.getDataset());
- dateTimeZone = DateTimeZone.forID(dataset.getTimezone());
List<Long> monitoringWindowEndTimes = getMonitoringWindowEndTimes();
for (long monitoringEndTime : monitoringWindowEndTimes) {
long endTime = monitoringEndTime -
TimeUnit.MILLISECONDS.convert(windowDelay, windowDelayUnit);
- long startTime = endTime - TimeUnit.MILLISECONDS.convert(windowSize,
windowUnit);
+ long startTime = endTime - this.windowSizeMillis;
monitoringWindows.add(new Interval(startTime, endTime,
dateTimeZone));
}
+ for (Interval window : monitoringWindows){
+ LOG.info("running detections in windows {}", window);
+ }
return monitoringWindows;
} catch (Exception e) {
LOG.info("can't generate moving monitoring windows, calling with
single detection window", e);
@@ -154,6 +169,7 @@ public DetectionPipelineResult run() throws Exception {
return Collections.singletonList(new Interval(startTime, endTime));
}
+ // get the list of monitoring window end times
private List<Long> getMonitoringWindowEndTimes() {
List<Long> endTimes = new ArrayList<>();
@@ -169,6 +185,13 @@ public DetectionPipelineResult run() throws Exception {
return endTimes;
}
+ /**
+ * round this time to earlier boundary, depending on granularity of dataset
+ * e.g. 12:15pm on HOURLY dataset should be treated as 12pm
+ * any dataset with granularity finer than HOUR, will be rounded as per
function frequency (assumption is that this is in MINUTES)
+ * so 12.53 on 5 MINUTES dataset, with function frequency 15 MINUTES will be
rounded to 12.45
+ * See also {@link
DetectionJobSchedulerUtils#getBoundaryAlignedTimeForDataset(DateTime, TimeUnit)}
+ */
private long getBoundaryAlignedTimeForDataset(DateTime currentTime) {
TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
TimeUnit dataUnit = timeSpec.getDataGranularity().getUnit();
@@ -192,6 +215,12 @@ private long getBoundaryAlignedTimeForDataset(DateTime
currentTime) {
return currentTime.getMillis();
}
+ /**
+ * get bucket size in millis, according to data granularity of dataset
+ * Bucket size are 1 HOUR for hourly, 1 DAY for daily
+ * For MINUTE level data, bucket size is calculated based on anomaly
function frequency
+ * See also {@link
DetectionJobSchedulerUtils#getBucketSizePeriodForDataset(DatasetConfigDTO,
AnomalyFunctionDTO)} (DateTime, TimeUnit)}
+ */
public Period getBucketSizePeriodForDataset() {
Period bucketSizePeriod = null;
TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 64ffc47bb8..df08645a7d 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -17,10 +17,13 @@
package com.linkedin.thirdeye.detection.wrapper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.linkedin.thirdeye.dataframe.DoubleSeries;
import com.linkedin.thirdeye.dataframe.Series;
import com.linkedin.thirdeye.dataframe.util.MetricSlice;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.DetectionUtils;
import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
@@ -29,11 +32,15 @@
import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +48,7 @@
/**
* Baseline filling merger. This merger's merging behavior is the same as
MergeWrapper. But add the capability
- * of filling baseline & current values.
+ * of filling baseline & current values and inject detector, metric urn. Each
detector has a separate baseline filling merge wrapper
*/
public class BaselineFillingMergeWrapper extends MergeWrapper {
private static final Logger LOG =
LoggerFactory.getLogger(MergeWrapper.class);
@@ -49,12 +56,15 @@
private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
private static final String PROP_CURRENT_PROVIDER = "currentValueProvider";
private static final String PROP_METRIC_URN = "metricUrn";
- private static final String PROP_BASELINE_PROVIDER_COMPONENT_KEY =
"baselineProviderComponentKey";
+ private static final String PROP_BASELINE_PROVIDER_COMPONENT_NAME =
"baselineProviderComponentName";
+ private static final String PROP_DETECTOR = "detector";
+ private static final String PROP_DETECTOR_COMPONENT_NAME =
"detectorComponentName";
private BaselineProvider baselineValueProvider; // optionally configure a
baseline value loader
private BaselineProvider currentValueProvider;
- private Series.DoubleFunction aggregationFunction;
private String baselineProviderComponentKey;
+ private String detectorComponentName;
+ private String metricUrn;
public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO
config, long startTime, long endTime)
{
@@ -77,14 +87,24 @@ public BaselineFillingMergeWrapper(DataProvider provider,
DetectionConfigDTO con
InputDataFetcher dataFetcher = new
DefaultInputDataFetcher(this.provider, this.config.getId());
this.currentValueProvider.init(spec, dataFetcher);
}
+
+ // inject detector to nested property if possible
+ String detectorComponentKey = MapUtils.getString(config.getProperties(),
PROP_DETECTOR);
+ if (detectorComponentKey != null){
+ this.detectorComponentName =
DetectionUtils.getComponentName(detectorComponentKey);
+ for (Map<String, Object> properties : this.nestedProperties){
+ properties.put(PROP_DETECTOR, detectorComponentKey);
+ }
+ }
+
+ // inject metricUrn to nested property if possible
String nestedUrn = MapUtils.getString(config.getProperties(),
PROP_METRIC_URN);
if (nestedUrn != null){
+ this.metricUrn = nestedUrn;
for (Map<String, Object> properties : this.nestedProperties){
properties.put(PROP_METRIC_URN, nestedUrn);
}
}
-
- this.aggregationFunction =
BaselineAggregateType.valueOf(MapUtils.getString(config.getProperties(),
"metricFunction", "MEAN")).getFunction();
}
@Override
@@ -92,6 +112,26 @@ public BaselineFillingMergeWrapper(DataProvider provider,
DetectionConfigDTO con
return this.fillCurrentAndBaselineValue(super.merge(anomalies));
}
+ @Override
+ protected List<MergedAnomalyResultDTO>
retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ AnomalySlice effectiveSlice = this.slice
+ .withStart(this.getStartTime(generated) - this.maxGap - 1)
+ .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+ Collection<MergedAnomalyResultDTO> retrieved =
this.provider.fetchAnomalies(Collections.singleton(effectiveSlice),
this.config.getId()).get(effectiveSlice);
+
+ Collection<MergedAnomalyResultDTO> anomalies =
+ Collections2.filter(retrieved,
+ mergedAnomaly -> mergedAnomaly != null &&
+ !mergedAnomaly.isChild() &&
+ // merge if only the anomaly generated by the same detector
+
this.detectorComponentName.equals(mergedAnomaly.getProperties().getOrDefault(PROP_DETECTOR_COMPONENT_NAME,
"")) &&
+ // merge if only the anomaly is in the same dimension
+ this.metricUrn.equals(mergedAnomaly.getMetricUrn())
+ );
+ return new ArrayList<>(anomalies);
+ }
+
/**
* Fill in current and baseline value for the anomalies
* @param mergedAnomalies anomalies
@@ -101,12 +141,25 @@ public BaselineFillingMergeWrapper(DataProvider provider,
DetectionConfigDTO con
for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
try {
String metricUrn = anomaly.getMetricUrn();
- final MetricSlice slice =
MetricSlice.from(MetricEntity.fromURN(metricUrn).getId(),
anomaly.getStartTime(), anomaly.getEndTime(),
- MetricEntity.fromURN(metricUrn).getFilters());
+ MetricEntity me = MetricEntity.fromURN(metricUrn);
+ long metricId = me.getId();
+ MetricConfigDTO metricConfigDTO =
this.provider.fetchMetrics(Collections.singletonList(metricId)).get(metricId);
+ // aggregation function
+ Series.DoubleFunction aggregationFunction = DoubleSeries.MEAN;
+
+ try {
+ aggregationFunction =
+
BaselineAggregateType.valueOf(metricConfigDTO.getDefaultAggFunction().name()).getFunction();
+ } catch (Exception e) {
+ LOG.warn("cannot get aggregation function for metric, using
average", metricId);
+ }
+
+ final MetricSlice slice = MetricSlice.from(metricId,
anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
anomaly.setAvgCurrentVal(this.currentValueProvider.computePredictedAggregates(slice,
aggregationFunction));
if (this.baselineValueProvider != null) {
anomaly.setAvgBaselineVal(this.baselineValueProvider.computePredictedAggregates(slice,
aggregationFunction));
- anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_KEY,
this.baselineProviderComponentKey);
+ anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_NAME,
this.baselineProviderComponentKey);
+ anomaly.setWeight(calculateWeight(anomaly));
}
} catch (Exception e) {
// ignore
@@ -116,4 +169,7 @@ public BaselineFillingMergeWrapper(DataProvider provider,
DetectionConfigDTO con
return mergedAnomalies;
}
+ private double calculateWeight(MergedAnomalyResultDTO anomaly) {
+ return (anomaly.getAvgCurrentVal() - anomaly.getAvgBaselineVal()) /
anomaly.getAvgBaselineVal();
+ }
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 1382d9ef7a..9ffc5c28c5 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,7 +20,6 @@
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
-import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,7 +36,7 @@
*/
public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO
config, long startTime, long endTime)
- throws Exception {
+ {
super(provider, config, startTime, endTime);
}
@@ -105,6 +104,7 @@ private MergedAnomalyResultDTO
copyAnomalyInfo(MergedAnomalyResultDTO anomaly, M
newAnomaly.setFeedback(anomaly.getFeedback());
newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
newAnomaly.setScore(anomaly.getScore());
+ newAnomaly.setWeight(anomaly.getWeight());
return newAnomaly;
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 45ecf73bd9..45cad5bfca 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -138,6 +138,7 @@
private static final String PROP_WINDOW_UNIT = "windowUnit";
private static final String PROP_FREQUENCY = "frequency";
private static final String PROP_MERGER = "merger";
+ private static final String PROP_TIMEZONE = "timezone";
private static final DetectionRegistry DETECTION_REGISTRY =
DetectionRegistry.getInstance();
private static final Map<String, String> DETECTOR_TO_BASELINE =
ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE");
@@ -227,7 +228,6 @@ YamlTranslationResult translateYaml() {
Map<String, Object> nestedProperties = new HashMap<>();
nestedProperties.put(PROP_CLASS_NAME,
AnomalyDetectorWrapper.class.getName());
String detectorKey = makeComponentKey(ruleName, detectorType, id);
- nestedProperties.put(PROP_DETECTOR, detectorKey);
fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
@@ -242,6 +242,7 @@ YamlTranslationResult translateYaml() {
}
String baselineProviderKey = makeComponentKey(ruleName,
baselineProviderType, id);
properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+ properties.put(PROP_DETECTOR, detectorKey);
buildComponentSpec(yamlConfig, baselineProviderType, baselineProviderKey);
properties.putAll(this.mergerProperties);
return properties;
@@ -283,6 +284,9 @@ private void fillInWindowSizeAndUnit(Map<String, Object>
properties, Map<String,
if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig,
PROP_WINDOW_DELAY_UNIT));
}
+ if (yamlConfig.containsKey(PROP_TIMEZONE)){
+ properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig,
PROP_TIMEZONE));
+ }
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
index ce1c03c49d..0d6891e5b6 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -40,11 +40,11 @@
private static final String PROP_METRIC_URN = "metricUrn";
private static final String PROP_MOVING_WINDOW_DETECTION =
"isMovingWindowDetection";
private static final String PROP_DETECTOR = "detector";
+ private static final String PROP_TIMEZONE = "timezone";
private MockDataProvider provider;
private Map<String, Object> properties;
private DetectionConfigDTO config;
- private Map<String, Object> stageSpecs;
@BeforeMethod
public void setUp() {
@@ -80,6 +80,7 @@ public void testMonitoringWindow() {
@Test
public void testMovingMonitoringWindow() {
this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
AnomalyDetectorWrapper detectionPipeline =
new AnomalyDetectorWrapper(this.provider, this.config, 1540147725000L,
1540493325000L);
List<Interval> monitoringWindows =
detectionPipeline.getMonitoringWindows();
@@ -89,4 +90,17 @@ public void testMovingMonitoringWindow() {
new Interval(1540252800000L, 1540339200000L, timeZone), new
Interval(1540339200000L, 1540425600000L, timeZone)));
}
+ @Test
+ public void testMovingMonitoringWindowBoundary() {
+ this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+ this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
+ AnomalyDetectorWrapper detectionPipeline =
+ new AnomalyDetectorWrapper(this.provider, this.config, 1540080000000L,
1540425600000L);
+ List<Interval> monitoringWindows =
detectionPipeline.getMonitoringWindows();
+ DateTimeZone timeZone = DateTimeZone.forID(TimeSpec.DEFAULT_TIMEZONE);
+ Assert.assertEquals(monitoringWindows,
+ Arrays.asList(new Interval(1540080000000L, 1540166400000L, timeZone),
new Interval(1540166400000L, 1540252800000L, timeZone),
+ new Interval(1540252800000L, 1540339200000L, timeZone), new
Interval(1540339200000L, 1540425600000L, timeZone)));
+ }
+
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 998c8eb2d4..481c6f0995 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -17,10 +17,12 @@
package com.linkedin.thirdeye.detection.wrapper;
import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.constant.MetricAggFunction;
import com.linkedin.thirdeye.dataframe.DataFrame;
import com.linkedin.thirdeye.dataframe.util.MetricSlice;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
import com.linkedin.thirdeye.detection.DetectionPipelineResult;
@@ -95,16 +97,22 @@ public void beforeMethod() {
@Test
public void testMergerCurrentAndBaselineLoading() throws Exception {
MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
+ anomaly.setProperties(ImmutableMap.of("detectorComponentName",
"testDetector"));
anomaly.setMetricUrn("thirdeye:metric:1");
Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME
+ ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
+ MetricConfigDTO metric = new MetricConfigDTO();
+ metric.setId(1L);
+ metric.setDefaultAggFunction(MetricAggFunction.SUM);
DataProvider
- provider = new MockDataProvider().setLoader(new
MockPipelineLoader(this.runs,
Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates);
+ provider = new MockDataProvider().setLoader(new
MockPipelineLoader(this.runs,
Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
+ .setMetrics(Collections.singletonList(metric));
this.config.getProperties().put(PROP_MAX_GAP, 100);
this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+ this.config.getProperties().put("detector", "$testDetector");
BaselineProvider baselineProvider = new MockBaselineProvider();
MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600),
100.0));
diff --git
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
index 050043c540..65fb1b5979 100644
---
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
+++
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
@@ -17,9 +17,9 @@
"className" :
"com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
"maxGap" : 0,
"nested" : [ {
- "className" :
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
- "detector" : "$rule1:THRESHOLD:0"
+ "className" :
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
} ],
+ "detector" : "$rule1:THRESHOLD:0",
"maxDuration" : 100
} ]
} ]
@@ -31,9 +31,9 @@
"className" :
"com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
"maxGap" : 0,
"nested" : [ {
- "className" :
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
- "detector" : "$rule2:THRESHOLD:0"
+ "className" :
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
} ],
+ "detector" : "$rule2:THRESHOLD:0",
"maxDuration" : 100
} ]
} ],
diff --git
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
index 2d875a4ddc..bd43106dde 100644
---
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
+++
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
@@ -9,9 +9,9 @@
"baselineValueProvider" : "$rule1:RULE_BASELINE:0",
"className" :
"com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
"nested" : [ {
- "className" :
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
- "detector" : "$rule1:THRESHOLD:0"
- } ]
+ "className" :
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
+ } ],
+ "detector" : "$rule1:THRESHOLD:0"
} ],
"minContribution" : 0.05,
"dimensions" : [ "D1", "D2" ]
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]