jihaozh closed pull request #3586: [TE] detection pipeline - multiple 
improvements
URL: https://github.com/apache/incubator-pinot/pull/3586
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
index 57d7fb5641..30be06f65d 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/anomaly/detection/DetectionTaskRunner.java
@@ -259,7 +259,7 @@ private void storeData(AnomalyDetectionOutputContext 
anomalyDetectionOutputConte
       historyMergedAnomalies = Collections.emptyList();
     }
 
-    LOG.info("Analyzing anomaly function with explored dimensions: {}, 
windowStart: {}, windowEnd: {}",
+    LOG.info("[Old pipeline] Analyzing anomaly function with explored 
dimensions: {}, windowStart: {}, windowEnd: {}",
         dimensionMap, windowStart, windowEnd);
 
     AnomalyUtils.logAnomaliesOverlapWithWindow(windowStart, windowEnd, 
historyMergedAnomalies);
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 42cbc6205f..aad718a043 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -405,6 +405,9 @@ public MergedAnomalyResultBean 
convertMergeAnomalyDTO2Bean(MergedAnomalyResultDT
       for (MergedAnomalyResultDTO child : entity.getChildren()) {
         if (child.getId() == null) {
           // only allow single level to prevent cycles
+          if (child == entity){
+            throw new IllegalArgumentException("Cannot contain itself as child 
anomaly");
+          }
           if (child.getChildren() != null && !child.getChildren().isEmpty()) {
             throw new IllegalArgumentException("Multi-level anomaly nesting 
not supported");
           }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index d4035aaf8e..c53af497b9 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -28,6 +28,7 @@
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.detector.email.filter.AlertFilterFactory;
 import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -59,6 +60,8 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DetectionMigrationResource.class);
   private static final String PROP_WINDOW_DELAY = "windowDelay";
   private static final String PROP_WINDOW_DELAY_UNIT = "windowDelayUnit";
+  private static final String PROP_WINDOW_SIZE = "windowSize";
+  private static final String PROP_WINDOW_UNIT = "windowUnit";
 
   private final LegacyAnomalyFunctionTranslator translator;
   private final AnomalyFunctionManager anomalyFunctionDAO;
@@ -115,8 +118,21 @@ public String migrateToYaml(@QueryParam("id") long 
anomalyFunctionId) throws Exc
     ruleYaml.put("name", "myRule");
 
     // detection
-    ruleYaml.put("detection", Collections.singletonList(
-        ImmutableMap.of("type", "ALGORITHM", "params", 
getAlgorithmDetectorParams(anomalyFunctionDTO))));
+    if (anomalyFunctionDTO.getType().equals("WEEK_OVER_WEEK_RULE")){
+      // wo1w change detector
+      ruleYaml.put("detection", 
Collections.singletonList(ImmutableMap.of("type", "PERCENTAGE_RULE",
+          "params", 
getPercentageChangeRuleDetectorParams(anomalyFunctionDTO))));
+    } else if (anomalyFunctionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+      // threshold detector
+      ruleYaml.put("detection", 
Collections.singletonList(ImmutableMap.of("type", "THRESHOLD",
+          "params", 
getMinMaxThresholdRuleDetectorParams(anomalyFunctionDTO))));
+    } else{
+      // algorithm detector
+      ruleYaml.put("detection", Collections.singletonList(
+          ImmutableMap.of("type", "ALGORITHM", "params", 
getAlgorithmDetectorParams(anomalyFunctionDTO),
+              PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize(),
+              PROP_WINDOW_UNIT, 
anomalyFunctionDTO.getWindowUnit().toString())));
+    }
 
     // filters
     Map<String, String> alertFilter = anomalyFunctionDTO.getAlertFilter();
@@ -157,7 +173,7 @@ public String migrateToYaml(@QueryParam("id") long 
anomalyFunctionId) throws Exc
     return this.yaml.dump(yamlConfigs);
   }
 
-  private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO 
functionDTO) {
+  private Map<String, Object> getDimensionExplorationParams(AnomalyFunctionDTO 
functionDTO) throws IOException {
     Map<String, Object> dimensionExploreYaml = new LinkedHashMap<>();
     dimensionExploreYaml.put("dimensions", 
Collections.singletonList(functionDTO.getExploreDimensions()));
     if (functionDTO.getDataFilter() != null && 
!functionDTO.getDataFilter().isEmpty() && 
functionDTO.getDataFilter().get("type").equals("average_threshold")) {
@@ -166,6 +182,13 @@ public String migrateToYaml(@QueryParam("id") long 
anomalyFunctionId) throws Exc
       dimensionExploreYaml.put("minValue", 
Double.valueOf(functionDTO.getDataFilter().get("threshold")));
       dimensionExploreYaml.put("minLiveZone", 
functionDTO.getDataFilter().get("minLiveZone"));
     }
+    if (functionDTO.getType().equals("MIN_MAX_THRESHOLD")){
+      // migrate volume threshold
+      Properties properties = 
AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+      if (properties.containsKey("averageVolumeThreshold")){
+        dimensionExploreYaml.put("minValue", 
properties.getProperty("averageVolumeThreshold"));
+      }
+    }
     return dimensionExploreYaml;
   }
 
@@ -208,6 +231,33 @@ private String getBucketPeriod(AnomalyFunctionDTO 
functionDTO) {
     return new 
Period(TimeUnit.MILLISECONDS.convert(functionDTO.getBucketSize(), 
functionDTO.getBucketUnit())).toString();
   }
 
+  private Map<String, Object> 
getPercentageChangeRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws 
IOException {
+    Map<String, Object> detectorYaml = new LinkedHashMap<>();
+    Properties properties = 
AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+    double threshold = 
Double.valueOf(properties.getProperty("changeThreshold"));
+    if (properties.containsKey("changeThreshold")){
+      detectorYaml.put("percentageChange", Math.abs(threshold));
+      if (threshold > 0){
+        detectorYaml.put("pattern", "UP");
+      } else {
+        detectorYaml.put("pattern", "DOWN");
+      }
+    }
+    return detectorYaml;
+  }
+
+  private Map<String, Object> 
getMinMaxThresholdRuleDetectorParams(AnomalyFunctionDTO functionDTO) throws 
IOException {
+    Map<String, Object> detectorYaml = new LinkedHashMap<>();
+    Properties properties = 
AnomalyFunctionDTO.toProperties(functionDTO.getProperties());
+    if (properties.containsKey("min")){
+      detectorYaml.put("min", properties.getProperty("min"));
+    }
+    if (properties.containsKey("max")){
+      detectorYaml.put("max", properties.getProperty("max"));
+    }
+    return detectorYaml;
+  }
+
   private Map<String, Object> getAlgorithmDetectorParams(AnomalyFunctionDTO 
functionDTO) throws Exception {
     Map<String, Object> detectorYaml = new LinkedHashMap<>();
     Map<String, Object> params = new LinkedHashMap<>();
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index a51a3a62c7..dd63d06db5 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -17,6 +17,7 @@
 package com.linkedin.thirdeye.detection;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
 import com.linkedin.thirdeye.constant.AnomalyResultSource;
 import com.linkedin.thirdeye.datalayer.bao.DatasetConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
@@ -35,9 +36,11 @@
 import com.linkedin.thirdeye.detection.finetune.TuningAlgorithm;
 import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.wordnik.swagger.annotations.ApiParam;
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,7 +54,9 @@
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.MapUtils;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
+import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +66,9 @@
 public class DetectionResource {
   private static final Logger LOG = 
LoggerFactory.getLogger(DetectionResource.class);
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final String DAILY_CRON = "0 0 14 * * ? *";
+  private static final String HOURLY_CRON = "0 0 * * * ? *";
+  private static final String MINUTE_CRON = "0 0/15 * * * ? *";
 
   private final MetricConfigManager metricDAO;
   private final DatasetConfigManager datasetDAO;
@@ -164,11 +172,102 @@ public Response detectionPreview(
     return Response.ok(result).build();
   }
 
+  // return default bucket size based on cron schedule.
+  private long getBucketSize(DetectionConfigDTO config){
+    switch (config.getCron()) {
+      case DAILY_CRON:
+        // daily
+        return TimeUnit.DAYS.toMillis(1);
+      case MINUTE_CRON:
+        // minute level
+        return TimeUnit.MINUTES.toMillis(15);
+      case HOURLY_CRON:
+        // hourly
+        return TimeUnit.HOURS.toMillis(1);
+    }
+    throw new IllegalArgumentException("bucket size not determined");
+  }
+
+  // return default window size based on cron schedule.
+  private long getWindowSize(DetectionConfigDTO config) {
+    switch (config.getCron()) {
+      case DAILY_CRON:
+        // daily
+        return TimeUnit.DAYS.toMillis(1);
+      case MINUTE_CRON:
+        // minute level
+        return TimeUnit.HOURS.toMillis(6);
+      case HOURLY_CRON:
+        // hourly
+        return TimeUnit.HOURS.toMillis(24);
+    }
+    throw new IllegalArgumentException("window size not determined");
+  }
+
+  /*
+  Generates monitoring window based on cron schedule.
+   */
+  private List<Interval> getReplayMonitoringWindows(DetectionConfigDTO config, 
long start, long end, Long windowSize, Long bucketSize) throws ParseException {
+    List<Interval> monitoringWindows = new ArrayList<>();
+    CronExpression cronExpression = new CronExpression(config.getCron());
+    DateTime currentStart = new DateTime(start);
+
+    long legacyWindowSize;
+    if(windowSize == null){
+      legacyWindowSize = getWindowSize(config);
+      LOG.warn("[Legacy replay] window size not set when replay {}. Use 
default window size {}", config.getId(), legacyWindowSize);
+    } else {
+      legacyWindowSize = windowSize;
+    }
+
+    long legacyBucketSize;
+    if (bucketSize == null){
+      legacyBucketSize = getBucketSize(config);
+      LOG.warn("[Legacy replay] bucket size not set when replay {}. Use 
default bucket size {}", config.getId(), legacyBucketSize);
+    } else {
+      legacyBucketSize = bucketSize;
+    }
+
+    // add offsets to that it would replay all the moving windows within start 
time and end time
+    currentStart = currentStart.plus(legacyWindowSize).minus(legacyBucketSize);
+    if (config.getCron().equals(DAILY_CRON)){
+      // daily detection offset by 1 to pick up the first moving window
+      currentStart.minus(1L);
+    }
+
+    DateTime currentEnd = new 
DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+    DateTime endBoundary = new 
DateTime(cronExpression.getNextValidTimeAfter(new Date(end)));
+    while (currentEnd.isBefore(endBoundary)) {
+      monitoringWindows.add(new Interval(currentStart.getMillis(), 
currentEnd.getMillis()));
+      currentStart = currentEnd;
+      currentEnd =  new 
DateTime(cronExpression.getNextValidTimeAfter(currentStart.toDate()));
+    }
+    return monitoringWindows;
+  }
+
+  /**
+   * Legacy replay endpoint. Replay all the moving windows within start time 
and end time.
+   * Saves anomaly for each moving window before starting detection for next 
window.
+   * Behaves exactly like the legacy replay endpoint.
+   * See also {@link 
com.linkedin.thirdeye.dashboard.resources.DetectionJobResource#generateAnomaliesInRangeForFunctions(String,
 String, String, String, Boolean, Boolean)}}
+   * @param configId the config id to replay
+   * @param start start time
+   * @param end end time
+   * @param deleteExistingAnomaly (optional) delete existing anomaly or not
+   * @param windowSize (optional) override the default window size
+   * @param bucketSize (optional) override the default window size
+   * @return anomalies
+   * @throws Exception
+   */
   @POST
-  @Path("/replay/{id}")
-  public Response detectionReplay(@PathParam("id") long configId, 
@QueryParam("start") long start,
+  @Path("/legacy-replay/{id}")
+  public Response legacyReplay(
+      @PathParam("id") long configId,
+      @QueryParam("start") long start,
       @QueryParam("end") long end,
-      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean 
deleteExistingAnomaly) throws Exception {
+      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean 
deleteExistingAnomaly,
+      @QueryParam("windowSize") Long windowSize,
+      @QueryParam("bucketSize") Long bucketSize) throws Exception {
 
     DetectionConfigDTO config = this.configDAO.findById(configId);
     if (config == null) {
@@ -188,6 +287,57 @@ public Response detectionReplay(@PathParam("id") long 
configId, @QueryParam("sta
       this.anomalyDAO.deleteByIds(existingIds);
     }
 
+    // execute replay
+    List<Interval> monitoringWindows = getReplayMonitoringWindows(config, 
start, end, windowSize, bucketSize);
+    for (Interval monitoringWindow : monitoringWindows){
+      DetectionPipeline pipeline = this.loader.from(this.provider, config, 
monitoringWindow.getStartMillis(), monitoringWindow.getEndMillis());
+      DetectionPipelineResult result = pipeline.run();
+
+      // save state
+      if (result.getLastTimestamp() > 0) {
+        config.setLastTimestamp(result.getLastTimestamp());
+      }
+
+      this.configDAO.update(config);
+
+      for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+        anomaly.setAnomalyResultSource(AnomalyResultSource.ANOMALY_REPLAY);
+        this.anomalyDAO.save(anomaly);
+      }
+    }
+
+    Collection<MergedAnomalyResultDTO> replayResult = 
this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+    LOG.info("replay detection pipeline {} generated {} anomalies.", 
config.getId(), replayResult.size());
+    return Response.ok(replayResult).build();
+  }
+
+  /**
+   * Replay for a given time range. Without cron schedule behavior
+   */
+  @POST
+  @Path("/replay/{id}")
+  public Response detectionReplay(
+      @PathParam("id") long configId,
+      @QueryParam("start") long start,
+      @QueryParam("end") long end) throws Exception {
+
+    DetectionConfigDTO config = this.configDAO.findById(configId);
+    if (config == null) {
+      throw new IllegalArgumentException(String.format("Cannot find config 
%d", configId));
+    }
+
+    // clear existing anomalies
+    AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
+    Collection<MergedAnomalyResultDTO> existing = 
this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
+
+    List<Long> existingIds = new ArrayList<>();
+    for (MergedAnomalyResultDTO anomaly : existing) {
+      existingIds.add(anomaly.getId());
+    }
+
+    this.anomalyDAO.deleteByIds(existingIds);
+
     // execute replay
     DetectionPipeline pipeline = this.loader.from(this.provider, config, 
start, end);
     DetectionPipelineResult result = pipeline.run();
@@ -204,7 +354,6 @@ public Response detectionReplay(@PathParam("id") long 
configId, @QueryParam("sta
       this.anomalyDAO.save(anomaly);
     }
 
-    LOG.info("replay detection pipeline {} generated {} anomalies.", 
config.getId(), result.getAnomalies().size());
-    return Response.ok(result.getAnomalies()).build();
+    return Response.ok(result).build();
   }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
index f1bf20dc79..5f80b10de4 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionUtils.java
@@ -65,8 +65,10 @@ public static boolean isReferenceName(String key) {
   }
 
   // get the component name from the reference key
+  // example "$myRule:ALGORITHM:0" -> "myRule:ALGORITHM:0"
   public static String getComponentName(String key) {
-    return key.substring(1);
+    if (isReferenceName(key)) return key.substring(1);
+    else throw new IllegalArgumentException("not a component reference key. 
should starts with $");
   }
 
   // get the spec class name for a component class
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 43f9c27cea..19b13fb765 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -46,7 +46,7 @@
   private static final String PROP_NESTED = "nested";
   private static final String PROP_CLASS_NAME = "className";
   private static final String PROP_MERGE_KEY = "mergeKey";
-  private static final String PROP_DETECTOR_COMPONENT_KEY = 
"detectorComponentKey";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = 
"detectorComponentName";
 
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new 
Comparator<MergedAnomalyResultDTO>() {
     @Override
@@ -67,7 +67,7 @@ public int compare(MergedAnomalyResultDTO o1, 
MergedAnomalyResultDTO o2) {
   protected final List<Map<String, Object>> nestedProperties;
   protected final long maxGap; // max time gap for merge
   protected final long maxDuration; // max overall duration of merged anomaly
-  private final AnomalySlice slice;
+  protected final AnomalySlice slice;
 
   /**
    * Instantiates a new merge wrapper.
@@ -181,7 +181,7 @@ public DetectionPipelineResult run() throws Exception {
     return output;
   }
 
-  private long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+  protected long getStartTime(Iterable<MergedAnomalyResultDTO> anomalies) {
     long time = this.startTime;
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       time = Math.min(anomaly.getStartTime(), time);
@@ -189,7 +189,7 @@ private long getStartTime(Iterable<MergedAnomalyResultDTO> 
anomalies) {
     return time;
   }
 
-  private long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
+  protected long getEndTime(Iterable<MergedAnomalyResultDTO> anomalies) {
     long time = this.endTime;
     for (MergedAnomalyResultDTO anomaly : anomalies) {
       time = Math.max(anomaly.getEndTime(), time);
@@ -214,7 +214,7 @@ public AnomalyKey(String metric, String collection, 
DimensionMap dimensions, Str
 
     public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
       return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), 
anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
-          anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
+          anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_NAME));
     }
 
     @Override
@@ -227,12 +227,13 @@ public boolean equals(Object o) {
       }
       AnomalyKey that = (AnomalyKey) o;
       return Objects.equals(metric, that.metric) && Objects.equals(collection, 
that.collection) && Objects.equals(
-          dimensions, that.dimensions) && Objects.equals(mergeKey, 
that.mergeKey);
+          dimensions, that.dimensions) && Objects.equals(mergeKey, 
that.mergeKey) && Objects.equals(componentKey,
+          that.componentKey);
     }
 
     @Override
     public int hashCode() {
-      return Objects.hash(metric, collection, dimensions, mergeKey);
+      return Objects.hash(metric, collection, dimensions, mergeKey, 
componentKey);
     }
   }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index a650550897..0374892d1a 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -18,8 +18,10 @@
 
 import com.google.common.base.Preconditions;
 import com.linkedin.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyResult;
 import com.linkedin.thirdeye.api.TimeGranularity;
 import com.linkedin.thirdeye.api.TimeSpec;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import com.linkedin.thirdeye.datalayer.dto.DatasetConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -61,7 +63,8 @@
   private static final String PROP_WINDOW_UNIT = "windowUnit";
   private static final String PROP_FREQUENCY = "frequency";
   private static final String PROP_DETECTOR = "detector";
-  private static final String PROP_DETECTOR_COMPONENT_KEY = 
"detectorComponentKey";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = 
"detectorComponentName";
+  private static final String PROP_TIMEZONE = "timezone";
 
   private static final Logger LOG = LoggerFactory.getLogger(
       AnomalyDetectorWrapper.class);
@@ -76,11 +79,13 @@
   private final MetricConfigDTO metric;
   private final MetricEntity metricEntity;
   private final boolean isMovingWindowDetection;
-  private DatasetConfigDTO dataset;
-  private DateTimeZone dateTimeZone;
   // need to specify run frequency for minute level detection. Used for moving 
monitoring window alignment, default to be 15 minutes.
   private final TimeGranularity functionFrequency;
-  private final String detectorReferenceKey;
+  private final String detectorName;
+  private final long windowSizeMillis;
+  private final DatasetConfigDTO dataset;
+  private final DateTimeZone dateTimeZone;
+
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO 
config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
@@ -90,19 +95,30 @@ public AnomalyDetectorWrapper(DataProvider provider, 
DetectionConfigDTO config,
     this.metric = 
provider.fetchMetrics(Collections.singleton(this.metricEntity.getId())).get(this.metricEntity.getId());
 
     
Preconditions.checkArgument(this.config.getProperties().containsKey(PROP_DETECTOR));
-    this.detectorReferenceKey = 
DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), 
PROP_DETECTOR));
-    
Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorReferenceKey));
-    this.anomalyDetector = (AnomalyDetector) 
this.config.getComponents().get(this.detectorReferenceKey);
+    this.detectorName = 
DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), 
PROP_DETECTOR));
+    
Preconditions.checkArgument(this.config.getComponents().containsKey(this.detectorName));
+    this.anomalyDetector = (AnomalyDetector) 
this.config.getComponents().get(this.detectorName);
 
+    // emulate moving window or now
     this.isMovingWindowDetection = 
MapUtils.getBooleanValue(config.getProperties(), PROP_MOVING_WINDOW_DETECTION, 
false);
     // delays to wait for data becomes available
     this.windowDelay = MapUtils.getIntValue(config.getProperties(), 
PROP_WINDOW_DELAY, 0);
+    // window delay unit
     this.windowDelayUnit = 
TimeUnit.valueOf(MapUtils.getString(config.getProperties(), 
PROP_WINDOW_DELAY_UNIT, "DAYS"));
     // detection window size
     this.windowSize = MapUtils.getIntValue(config.getProperties(), 
PROP_WINDOW_SIZE, 1);
     this.windowUnit = 
TimeUnit.valueOf(MapUtils.getString(config.getProperties(), PROP_WINDOW_UNIT, 
"DAYS"));
+    this.windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, 
windowUnit);
+    // run frequency, used to determine moving windows for minute-level 
detection
     Map<String, Object> frequency = MapUtils.getMap(config.getProperties(), 
PROP_FREQUENCY, Collections.emptyMap());
     this.functionFrequency = new 
TimeGranularity(MapUtils.getIntValue(frequency, "size", 15), 
TimeUnit.valueOf(MapUtils.getString(frequency, "unit", "MINUTES")));
+
+    MetricEntity me = MetricEntity.fromURN(this.metricUrn);
+    MetricConfigDTO metricConfigDTO = 
this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
+    this.dataset = 
this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
+        .get(metricConfigDTO.getDataset());
+    // date time zone for moving windows. use dataset time zone as default
+    this.dateTimeZone = 
DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, 
"America/Los_Angeles"));
   }
 
   @Override
@@ -112,6 +128,7 @@ public DetectionPipelineResult run() throws Exception {
     for (Interval window : monitoringWindows) {
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
+        LOG.info("[New Pipeline] running detection for config {} metricUrn {}. 
start time {}, end time{}", config.getId(), metricUrn, window.getStart(), 
window.getEnd());
         anomaliesForOneWindow = anomalyDetector.runDetection(window, 
this.metricUrn);
       } catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to 
{} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
@@ -125,27 +142,25 @@ public DetectionPipelineResult run() throws Exception {
       anomaly.setMetric(this.metric.getName());
       anomaly.setCollection(this.metric.getDataset());
       
anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
-      anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_KEY, 
this.detectorReferenceKey);
+      anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, 
this.detectorName);
     }
     return new DetectionPipelineResult(anomalies);
   }
 
+  // get a list of the monitoring window, if no sliding window used, use start 
time and end time as window
   List<Interval> getMonitoringWindows() {
     if (this.isMovingWindowDetection) {
       try{
         List<Interval> monitoringWindows = new ArrayList<>();
-        MetricEntity me = MetricEntity.fromURN(this.metricUrn);
-        MetricConfigDTO metricConfigDTO =
-            
this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
-        dataset = 
this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
-            .get(metricConfigDTO.getDataset());
-        dateTimeZone = DateTimeZone.forID(dataset.getTimezone());
         List<Long> monitoringWindowEndTimes = getMonitoringWindowEndTimes();
         for (long monitoringEndTime : monitoringWindowEndTimes) {
           long endTime = monitoringEndTime - 
TimeUnit.MILLISECONDS.convert(windowDelay, windowDelayUnit);
-          long startTime = endTime - TimeUnit.MILLISECONDS.convert(windowSize, 
windowUnit);
+          long startTime = endTime - this.windowSizeMillis;
           monitoringWindows.add(new Interval(startTime, endTime, 
dateTimeZone));
         }
+        for (Interval window : monitoringWindows){
+          LOG.info("running detections in windows {}", window);
+        }
         return monitoringWindows;
       } catch (Exception e) {
         LOG.info("can't generate moving monitoring windows, calling with 
single detection window", e);
@@ -154,6 +169,7 @@ public DetectionPipelineResult run() throws Exception {
     return Collections.singletonList(new Interval(startTime, endTime));
   }
 
+  // get the list of monitoring window end times
   private List<Long> getMonitoringWindowEndTimes() {
     List<Long> endTimes = new ArrayList<>();
 
@@ -169,6 +185,13 @@ public DetectionPipelineResult run() throws Exception {
     return endTimes;
   }
 
+  /**
+   * round this time to earlier boundary, depending on granularity of dataset
+   * e.g. 12:15pm on HOURLY dataset should be treated as 12pm
+   * any dataset with granularity finer than HOUR, will be rounded as per 
function frequency (assumption is that this is in MINUTES)
+   * so 12.53 on 5 MINUTES dataset, with function frequency 15 MINUTES will be 
rounded to 12.45
+   * See also {@link 
DetectionJobSchedulerUtils#getBoundaryAlignedTimeForDataset(DateTime, TimeUnit)}
+   */
   private long getBoundaryAlignedTimeForDataset(DateTime currentTime) {
     TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
     TimeUnit dataUnit = timeSpec.getDataGranularity().getUnit();
@@ -192,6 +215,12 @@ private long getBoundaryAlignedTimeForDataset(DateTime 
currentTime) {
     return currentTime.getMillis();
   }
 
+  /**
+   * get bucket size in millis, according to data granularity of dataset
+   * Bucket size are 1 HOUR for hourly, 1 DAY for daily
+   * For MINUTE level data, bucket size is calculated based on anomaly 
function frequency
+   * See also {@link 
DetectionJobSchedulerUtils#getBucketSizePeriodForDataset(DatasetConfigDTO, 
AnomalyFunctionDTO)} (DateTime, TimeUnit)}
+   */
   public Period getBucketSizePeriodForDataset() {
     Period bucketSizePeriod = null;
     TimeSpec timeSpec = ThirdEyeUtils.getTimeSpecFromDatasetConfig(dataset);
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index 64ffc47bb8..df08645a7d 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -17,10 +17,13 @@
 package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.linkedin.thirdeye.dataframe.DoubleSeries;
 import com.linkedin.thirdeye.dataframe.Series;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionUtils;
 import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
@@ -29,11 +32,15 @@
 import com.linkedin.thirdeye.detection.components.RuleBaselineProvider;
 import com.linkedin.thirdeye.detection.spec.RuleBaselineProviderSpec;
 import com.linkedin.thirdeye.detection.spi.components.BaselineProvider;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.rootcause.impl.MetricEntity;
 import com.linkedin.thirdeye.rootcause.timeseries.BaselineAggregateType;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +48,7 @@
 
 /**
  * Baseline filling merger. This merger's merging behavior is the same as 
MergeWrapper. But add the capability
- * of filling baseline & current values.
+ * of filling baseline & current values and inject detector, metric urn. Each 
detector has a separate baseline filling merge wrapper
  */
 public class BaselineFillingMergeWrapper extends MergeWrapper {
   private static final Logger LOG = 
LoggerFactory.getLogger(MergeWrapper.class);
@@ -49,12 +56,15 @@
   private static final String PROP_BASELINE_PROVIDER = "baselineValueProvider";
   private static final String PROP_CURRENT_PROVIDER = "currentValueProvider";
   private static final String PROP_METRIC_URN = "metricUrn";
-  private static final String PROP_BASELINE_PROVIDER_COMPONENT_KEY = 
"baselineProviderComponentKey";
+  private static final String PROP_BASELINE_PROVIDER_COMPONENT_NAME = 
"baselineProviderComponentName";
+  private static final String PROP_DETECTOR = "detector";
+  private static final String PROP_DETECTOR_COMPONENT_NAME = 
"detectorComponentName";
 
   private BaselineProvider baselineValueProvider; // optionally configure a 
baseline value loader
   private BaselineProvider currentValueProvider;
-  private Series.DoubleFunction aggregationFunction;
   private String baselineProviderComponentKey;
+  private String detectorComponentName;
+  private String metricUrn;
 
   public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO 
config, long startTime, long endTime)
   {
@@ -77,14 +87,24 @@ public BaselineFillingMergeWrapper(DataProvider provider, 
DetectionConfigDTO con
       InputDataFetcher dataFetcher = new 
DefaultInputDataFetcher(this.provider, this.config.getId());
       this.currentValueProvider.init(spec, dataFetcher);
     }
+
+    // inject detector to nested property if possible
+    String detectorComponentKey = MapUtils.getString(config.getProperties(), 
PROP_DETECTOR);
+    if (detectorComponentKey != null){
+      this.detectorComponentName = 
DetectionUtils.getComponentName(detectorComponentKey);
+      for (Map<String, Object> properties : this.nestedProperties){
+        properties.put(PROP_DETECTOR, detectorComponentKey);
+      }
+    }
+
+    // inject metricUrn to nested property if possible
     String nestedUrn = MapUtils.getString(config.getProperties(), 
PROP_METRIC_URN);
     if (nestedUrn != null){
+      this.metricUrn = nestedUrn;
       for (Map<String, Object> properties : this.nestedProperties){
         properties.put(PROP_METRIC_URN, nestedUrn);
       }
     }
-
-    this.aggregationFunction = 
BaselineAggregateType.valueOf(MapUtils.getString(config.getProperties(), 
"metricFunction", "MEAN")).getFunction();
   }
 
   @Override
@@ -92,6 +112,26 @@ public BaselineFillingMergeWrapper(DataProvider provider, 
DetectionConfigDTO con
     return this.fillCurrentAndBaselineValue(super.merge(anomalies));
   }
 
+  @Override
+  protected List<MergedAnomalyResultDTO> 
retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    AnomalySlice effectiveSlice = this.slice
+        .withStart(this.getStartTime(generated) - this.maxGap - 1)
+        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+    Collection<MergedAnomalyResultDTO> retrieved = 
this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), 
this.config.getId()).get(effectiveSlice);
+
+    Collection<MergedAnomalyResultDTO> anomalies =
+        Collections2.filter(retrieved,
+                mergedAnomaly -> mergedAnomaly != null &&
+                !mergedAnomaly.isChild() &&
+                    // merge if only the anomaly generated by the same detector
+                
this.detectorComponentName.equals(mergedAnomaly.getProperties().getOrDefault(PROP_DETECTOR_COMPONENT_NAME,
 "")) &&
+                    // merge if only the anomaly is in the same dimension
+                this.metricUrn.equals(mergedAnomaly.getMetricUrn())
+        );
+    return new ArrayList<>(anomalies);
+  }
+
   /**
    * Fill in current and baseline value for the anomalies
    * @param mergedAnomalies anomalies
@@ -101,12 +141,25 @@ public BaselineFillingMergeWrapper(DataProvider provider, 
DetectionConfigDTO con
     for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
       try {
         String metricUrn = anomaly.getMetricUrn();
-        final MetricSlice slice = 
MetricSlice.from(MetricEntity.fromURN(metricUrn).getId(), 
anomaly.getStartTime(), anomaly.getEndTime(),
-            MetricEntity.fromURN(metricUrn).getFilters());
+        MetricEntity me = MetricEntity.fromURN(metricUrn);
+        long metricId = me.getId();
+        MetricConfigDTO metricConfigDTO = 
this.provider.fetchMetrics(Collections.singletonList(metricId)).get(metricId);
+        // aggregation function
+        Series.DoubleFunction aggregationFunction = DoubleSeries.MEAN;
+
+        try {
+          aggregationFunction =
+              
BaselineAggregateType.valueOf(metricConfigDTO.getDefaultAggFunction().name()).getFunction();
+        } catch (Exception e) {
+          LOG.warn("cannot get aggregation function for metric, using 
average", metricId);
+        }
+
+        final MetricSlice slice = MetricSlice.from(metricId, 
anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
         
anomaly.setAvgCurrentVal(this.currentValueProvider.computePredictedAggregates(slice,
 aggregationFunction));
         if (this.baselineValueProvider != null) {
           
anomaly.setAvgBaselineVal(this.baselineValueProvider.computePredictedAggregates(slice,
 aggregationFunction));
-          anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_KEY, 
this.baselineProviderComponentKey);
+          anomaly.getProperties().put(PROP_BASELINE_PROVIDER_COMPONENT_NAME, 
this.baselineProviderComponentKey);
+          anomaly.setWeight(calculateWeight(anomaly));
         }
       } catch (Exception e) {
         // ignore
@@ -116,4 +169,7 @@ public BaselineFillingMergeWrapper(DataProvider provider, 
DetectionConfigDTO con
     return mergedAnomalies;
   }
 
+  private double calculateWeight(MergedAnomalyResultDTO anomaly) {
+    return (anomaly.getAvgCurrentVal() - anomaly.getAvgBaselineVal()) / 
anomaly.getAvgBaselineVal();
+  }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 1382d9ef7a..9ffc5c28c5 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,7 +20,6 @@
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
-import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -37,7 +36,7 @@
  */
 public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
   public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO 
config, long startTime, long endTime)
-      throws Exception {
+  {
     super(provider, config, startTime, endTime);
   }
 
@@ -105,6 +104,7 @@ private MergedAnomalyResultDTO 
copyAnomalyInfo(MergedAnomalyResultDTO anomaly, M
     newAnomaly.setFeedback(anomaly.getFeedback());
     newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
     newAnomaly.setScore(anomaly.getScore());
+    newAnomaly.setWeight(anomaly.getWeight());
     return newAnomaly;
   }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 45ecf73bd9..45cad5bfca 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -138,6 +138,7 @@
   private static final String PROP_WINDOW_UNIT = "windowUnit";
   private static final String PROP_FREQUENCY = "frequency";
   private static final String PROP_MERGER = "merger";
+  private static final String PROP_TIMEZONE = "timezone";
 
   private static final DetectionRegistry DETECTION_REGISTRY = 
DetectionRegistry.getInstance();
   private static final Map<String, String> DETECTOR_TO_BASELINE = 
ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE");
@@ -227,7 +228,6 @@ YamlTranslationResult translateYaml() {
     Map<String, Object> nestedProperties = new HashMap<>();
     nestedProperties.put(PROP_CLASS_NAME, 
AnomalyDetectorWrapper.class.getName());
     String detectorKey = makeComponentKey(ruleName, detectorType, id);
-    nestedProperties.put(PROP_DETECTOR, detectorKey);
 
     fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
 
@@ -242,6 +242,7 @@ YamlTranslationResult translateYaml() {
     }
     String baselineProviderKey = makeComponentKey(ruleName, 
baselineProviderType, id);
     properties.put(PROP_BASELINE_PROVIDER, baselineProviderKey);
+    properties.put(PROP_DETECTOR, detectorKey);
     buildComponentSpec(yamlConfig, baselineProviderType, baselineProviderKey);
     properties.putAll(this.mergerProperties);
     return properties;
@@ -283,6 +284,9 @@ private void fillInWindowSizeAndUnit(Map<String, Object> 
properties, Map<String,
       if (yamlConfig.containsKey(PROP_WINDOW_DELAY_UNIT)) {
         properties.put(PROP_WINDOW_DELAY_UNIT, MapUtils.getString(yamlConfig, 
PROP_WINDOW_DELAY_UNIT));
       }
+      if (yamlConfig.containsKey(PROP_TIMEZONE)){
+        properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, 
PROP_TIMEZONE));
+      }
     }
   }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
index ce1c03c49d..0d6891e5b6 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -40,11 +40,11 @@
   private static final String PROP_METRIC_URN = "metricUrn";
   private static final String PROP_MOVING_WINDOW_DETECTION = 
"isMovingWindowDetection";
   private static final String PROP_DETECTOR = "detector";
+  private static final String PROP_TIMEZONE = "timezone";
 
   private MockDataProvider provider;
   private Map<String, Object> properties;
   private DetectionConfigDTO config;
-  private Map<String, Object> stageSpecs;
 
   @BeforeMethod
   public void setUp() {
@@ -80,6 +80,7 @@ public void testMonitoringWindow() {
   @Test
   public void testMovingMonitoringWindow() {
     this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+    this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
     AnomalyDetectorWrapper detectionPipeline =
         new AnomalyDetectorWrapper(this.provider, this.config, 1540147725000L, 
1540493325000L);
     List<Interval> monitoringWindows = 
detectionPipeline.getMonitoringWindows();
@@ -89,4 +90,17 @@ public void testMovingMonitoringWindow() {
             new Interval(1540252800000L, 1540339200000L, timeZone), new 
Interval(1540339200000L, 1540425600000L, timeZone)));
   }
 
+  @Test
+  public void testMovingMonitoringWindowBoundary() {
+    this.properties.put(PROP_MOVING_WINDOW_DETECTION, true);
+    this.properties.put(PROP_TIMEZONE, TimeSpec.DEFAULT_TIMEZONE);
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1540080000000L, 
1540425600000L);
+    List<Interval> monitoringWindows = 
detectionPipeline.getMonitoringWindows();
+    DateTimeZone timeZone = DateTimeZone.forID(TimeSpec.DEFAULT_TIMEZONE);
+    Assert.assertEquals(monitoringWindows,
+        Arrays.asList(new Interval(1540080000000L, 1540166400000L, timeZone), 
new Interval(1540166400000L, 1540252800000L, timeZone),
+            new Interval(1540252800000L, 1540339200000L, timeZone), new 
Interval(1540339200000L, 1540425600000L, timeZone)));
+  }
+
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 998c8eb2d4..481c6f0995 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -17,10 +17,12 @@
 package com.linkedin.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableMap;
+import com.linkedin.thirdeye.constant.MetricAggFunction;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DefaultInputDataFetcher;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
@@ -95,16 +97,22 @@ public void beforeMethod() {
   @Test
   public void testMergerCurrentAndBaselineLoading() throws Exception {
     MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
+    anomaly.setProperties(ImmutableMap.of("detectorComponentName", 
"testDetector"));
     anomaly.setMetricUrn("thirdeye:metric:1");
 
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
     aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME 
+ ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
 
+    MetricConfigDTO metric = new MetricConfigDTO();
+    metric.setId(1L);
+    metric.setDefaultAggFunction(MetricAggFunction.SUM);
     DataProvider
-        provider = new MockDataProvider().setLoader(new 
MockPipelineLoader(this.runs, 
Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates);
+        provider = new MockDataProvider().setLoader(new 
MockPipelineLoader(this.runs, 
Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
+        .setMetrics(Collections.singletonList(metric));
 
     this.config.getProperties().put(PROP_MAX_GAP, 100);
     this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+    this.config.getProperties().put("detector", "$testDetector");
     BaselineProvider baselineProvider = new MockBaselineProvider();
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
     spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 
100.0));
diff --git 
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
 
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
index 050043c540..65fb1b5979 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
+++ 
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-1.json
@@ -17,9 +17,9 @@
             "className" : 
"com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
             "maxGap" : 0,
             "nested" : [ {
-              "className" : 
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-              "detector" : "$rule1:THRESHOLD:0"
+              "className" : 
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
             } ],
+            "detector" : "$rule1:THRESHOLD:0",
             "maxDuration" : 100
           } ]
         } ]
@@ -31,9 +31,9 @@
           "className" : 
"com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
           "maxGap" : 0,
           "nested" : [ {
-            "className" : 
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-            "detector" : "$rule2:THRESHOLD:0"
+            "className" : 
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
           } ],
+          "detector" : "$rule2:THRESHOLD:0",
           "maxDuration" : 100
         } ]
       } ],
diff --git 
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
 
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
index 2d875a4ddc..bd43106dde 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
+++ 
b/thirdeye/thirdeye-pinot/src/test/resources/com/linkedin/thirdeye/detection/yaml/compositePipelineTranslatorTestResult-2.json
@@ -9,9 +9,9 @@
         "baselineValueProvider" : "$rule1:RULE_BASELINE:0",
         "className" : 
"com.linkedin.thirdeye.detection.wrapper.BaselineFillingMergeWrapper",
         "nested" : [ {
-          "className" : 
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper",
-          "detector" : "$rule1:THRESHOLD:0"
-        } ]
+          "className" : 
"com.linkedin.thirdeye.detection.wrapper.AnomalyDetectorWrapper"
+        } ],
+        "detector" : "$rule1:THRESHOLD:0"
       } ],
       "minContribution" : 0.05,
       "dimensions" : [ "D1", "D2" ]


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to