apucher closed pull request #3570: [TE] detection - filter out child anomaly in 
anomaly fetching
URL: https://github.com/apache/incubator-pinot/pull/3570
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index f600c44212..c5f6ec362c 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -17,6 +17,7 @@
 package com.linkedin.thirdeye.detection;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
 import com.google.common.collect.Multimap;
 import com.linkedin.thirdeye.dataframe.DataFrame;
 import com.linkedin.thirdeye.dataframe.util.MetricSlice;
@@ -148,7 +149,7 @@ public DataFrame call() throws Exception {
 
       if (predicates.isEmpty()) throw new IllegalArgumentException("Must 
provide at least one of start, end, or " + functionIdKey);
 
-      List<MergedAnomalyResultDTO> anomalies = 
this.anomalyDAO.findByPredicate(AND(predicates));
+      Collection<MergedAnomalyResultDTO> anomalies = 
this.anomalyDAO.findByPredicate(AND(predicates));
       anomalies.removeIf(anomaly -> !slice.match(anomaly));
 
       if (isLegacy) {
@@ -160,12 +161,12 @@ public DataFrame call() throws Exception {
             (configId >= 0) && (anomaly.getDetectionConfigId() == null || 
anomaly.getDetectionConfigId() != configId)
         );
       }
+      // filter all child anomalies. those are kept in the parent anomaly 
children set.
+      anomalies = Collections2.filter(anomalies, mergedAnomaly -> 
mergedAnomaly != null && !mergedAnomaly.isChild());
 
-      LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {}) 
with confid Id = {}", anomalies.size(),
-          slice.getStart(), slice.getEnd(), configId);
+      LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {}) 
with confid Id = {}", anomalies.size(), slice.getStart(), slice.getEnd(), 
configId);
       output.putAll(slice, anomalies);
     }
-
     return output;
   }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index 3b15748f40..d4035aaf8e 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -16,6 +16,7 @@
 
 package com.linkedin.thirdeye.detection;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import 
com.linkedin.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
 import com.linkedin.thirdeye.datalayer.bao.AnomalyFunctionManager;
@@ -94,8 +95,9 @@ public DetectionMigrationResource(MetricConfigManager 
metricConfigDAO, AnomalyFu
   @GET
   public String migrateToYaml(@QueryParam("id") long anomalyFunctionId) throws 
Exception {
     AnomalyFunctionDTO anomalyFunctionDTO = 
this.anomalyFunctionDAO.findById(anomalyFunctionId);
+    Preconditions.checkArgument(anomalyFunctionDTO.getIsActive(), "try to 
migrate inactive anomaly function");
     Map<String, Object> yamlConfigs = new LinkedHashMap<>();
-    yamlConfigs.put("detectionName", anomalyFunctionDTO.getFunctionName());
+    yamlConfigs.put("detectionName", "new_pipeline_" + 
anomalyFunctionDTO.getFunctionName());
     yamlConfigs.put("metric", anomalyFunctionDTO.getMetric());
     yamlConfigs.put("dataset", anomalyFunctionDTO.getCollection());
     yamlConfigs.put("pipelineType", "Composite");
@@ -192,8 +194,8 @@ public String migrateToYaml(@QueryParam("id") long 
anomalyFunctionId) throws Exc
     Map<String, Object> params = new HashMap<>();
     filterYamlParams.put("configuration", params);
     params.putAll(functionDTO.getAlertFilter());
-    params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
-    params.put("variables.timeZone", getTimezone(functionDTO));
+    params.put("bucketPeriod", getBucketPeriod(functionDTO));
+    params.put("timeZone", getTimezone(functionDTO));
     return filterYamlParams;
   }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index e6dcad4d39..a51a3a62c7 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -41,6 +41,8 @@
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -49,6 +51,7 @@
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.MapUtils;
+import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -155,7 +158,7 @@ public Response detectionPreview(
     DetectionPipelineResult result = pipeline.run();
 
     if (diagnostics == null || !diagnostics) {
-      result.setDiagnostics(Collections.<String, Object>emptyMap());
+      result.setDiagnostics(Collections.emptyMap());
     }
 
     return Response.ok(result).build();
@@ -163,27 +166,28 @@ public Response detectionPreview(
 
   @POST
   @Path("/replay/{id}")
-  public Response detectionReplay(
-      @PathParam("id") long configId,
-      @QueryParam("start") long start,
-      @QueryParam("end") long end) throws Exception {
+  public Response detectionReplay(@PathParam("id") long configId, 
@QueryParam("start") long start,
+      @QueryParam("end") long end,
+      @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean 
deleteExistingAnomaly) throws Exception {
 
     DetectionConfigDTO config = this.configDAO.findById(configId);
     if (config == null) {
       throw new IllegalArgumentException(String.format("Cannot find config 
%d", configId));
     }
 
-    // clear existing anomalies
     AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
-    Collection<MergedAnomalyResultDTO> existing = 
this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
-
-    List<Long> existingIds = new ArrayList<>();
-    for (MergedAnomalyResultDTO anomaly : existing) {
-      existingIds.add(anomaly.getId());
+    if (deleteExistingAnomaly) {
+      // clear existing anomalies
+      Collection<MergedAnomalyResultDTO> existing =
+          this.provider.fetchAnomalies(Collections.singleton(slice), 
configId).get(slice);
+
+      List<Long> existingIds = new ArrayList<>();
+      for (MergedAnomalyResultDTO anomaly : existing) {
+        existingIds.add(anomaly.getId());
+      }
+      this.anomalyDAO.deleteByIds(existingIds);
     }
 
-    this.anomalyDAO.deleteByIds(existingIds);
-
     // execute replay
     DetectionPipeline pipeline = this.loader.from(this.provider, config, 
start, end);
     DetectionPipelineResult result = pipeline.run();
@@ -200,6 +204,7 @@ public Response detectionReplay(
       this.anomalyDAO.save(anomaly);
     }
 
-    return Response.ok(result).build();
-  }
+    LOG.info("replay detection pipeline {} generated {} anomalies.", 
config.getId(), result.getAnomalies().size());
+    return Response.ok(result.getAnomalies()).build();
   }
+}
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 144bf5ff1a..43f9c27cea 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -30,9 +30,11 @@
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.commons.collections.MapUtils;
 
 
@@ -44,6 +46,7 @@
   private static final String PROP_NESTED = "nested";
   private static final String PROP_CLASS_NAME = "className";
   private static final String PROP_MERGE_KEY = "mergeKey";
+  private static final String PROP_DETECTOR_COMPONENT_KEY = 
"detectorComponentKey";
 
   protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new 
Comparator<MergedAnomalyResultDTO>() {
     @Override
@@ -115,22 +118,23 @@ public DetectionPipelineResult run() throws Exception {
       i++;
     }
 
-    // retrieve anomalies
-    AnomalySlice effectiveSlice = this.slice
-        .withStart(this.getStartTime(generated) - this.maxGap - 1)
-        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
-
-    List<MergedAnomalyResultDTO> retrieved = new ArrayList<>();
-    
retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice),
 this.config.getId()).get(effectiveSlice));
-
     // merge
-    List<MergedAnomalyResultDTO> all = new ArrayList<>();
-    all.addAll(retrieved);
+    Set<MergedAnomalyResultDTO> all = new HashSet<>();
+    all.addAll(retrieveAnomaliesFromDatabase(generated));
     all.addAll(generated);
 
     return new 
DetectionPipelineResult(this.merge(all)).setDiagnostics(diagnostics);
   }
 
+  protected List<MergedAnomalyResultDTO> 
retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    AnomalySlice effectiveSlice = this.slice
+        .withStart(this.getStartTime(generated) - this.maxGap - 1)
+        .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+    return new 
ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice), 
this.config.getId()).get(effectiveSlice));
+  }
+
+  // logic to do time-based merging.
   protected List<MergedAnomalyResultDTO> 
merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
     Collections.sort(input, COMPARATOR);
@@ -198,16 +202,19 @@ private long getEndTime(Iterable<MergedAnomalyResultDTO> 
anomalies) {
     final String collection;
     final DimensionMap dimensions;
     final String mergeKey;
+    final String componentKey;
 
-    public AnomalyKey(String metric, String collection, DimensionMap 
dimensions, String mergeKey) {
+    public AnomalyKey(String metric, String collection, DimensionMap 
dimensions, String mergeKey, String componentKey) {
       this.metric = metric;
       this.collection = collection;
       this.dimensions = dimensions;
       this.mergeKey = mergeKey;
+      this.componentKey = componentKey;
     }
 
     public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
-      return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), 
anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY));
+      return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), 
anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
+          anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
     }
 
     @Override
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index d743971439..64ffc47bb8 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -110,7 +110,7 @@ public BaselineFillingMergeWrapper(DataProvider provider, 
DetectionConfigDTO con
         }
       } catch (Exception e) {
         // ignore
-        LOG.warn("cannot get metric slice for anomaly {}", anomaly, e);
+        LOG.warn("cannot get current or baseline value for anomaly {}. ", 
anomaly, e);
       }
     }
     return mergedAnomalies;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 77536f9a55..1382d9ef7a 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,6 +20,7 @@
 import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,6 +41,12 @@ public ChildKeepingMergeWrapper(DataProvider provider, 
DetectionConfigDTO config
     super(provider, config, startTime, endTime);
   }
 
+  @Override
+  // does not fetch any anomalies from database
+  protected List<MergedAnomalyResultDTO> 
retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+    return Collections.emptyList();
+  }
+
   @Override
   protected List<MergedAnomalyResultDTO> 
merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
@@ -54,7 +61,7 @@ public ChildKeepingMergeWrapper(DataProvider provider, 
DetectionConfigDTO config
       }
 
       MergeWrapper.AnomalyKey
-          key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), 
anomaly.getCollection(), anomaly.getDimensions(), "");
+          key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), 
anomaly.getCollection(), anomaly.getDimensions(), "", "");
       MergedAnomalyResultDTO parent = parents.get(key);
 
       if (parent == null || anomaly.getStartTime() - parent.getEndTime() > 
this.maxGap) {
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 4162940422..4a75289d6d 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -18,14 +18,12 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import com.linkedin.thirdeye.detection.DataProvider;
 import com.linkedin.thirdeye.detection.DetectionPipelineResult;
 import com.linkedin.thirdeye.detection.MockDataProvider;
 import com.linkedin.thirdeye.detection.MockPipeline;
 import com.linkedin.thirdeye.detection.MockPipelineLoader;
 import com.linkedin.thirdeye.detection.MockPipelineOutput;
-import com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -86,26 +84,23 @@ public void beforeMethod() {
     this.config.setName(PROP_NAME_VALUE);
     this.config.setProperties(this.properties);
 
-    List<MergedAnomalyResultDTO> existing = new ArrayList<>();
-    existing.add(makeAnomaly(0, 1000));
-    existing.add(makeAnomaly(1500, 2000));
-
     this.outputs = new ArrayList<>();
 
     this.outputs.add(new MockPipelineOutput(Arrays.asList(
         makeAnomaly(1100, 1200),
-        makeAnomaly(2200, 2300)
+        makeAnomaly(2200, 2300),
+        makeAnomaly(0, 1000)
     ), 2900));
 
     this.outputs.add(new MockPipelineOutput(Arrays.asList(
         makeAnomaly(1150, 1250),
-        makeAnomaly(2400, 2800)
+        makeAnomaly(2400, 2800),
+        makeAnomaly(1500, 2000)
     ), 3000));
 
     MockPipelineLoader mockLoader = new MockPipelineLoader(this.runs, 
this.outputs);
 
     this.provider = new MockDataProvider()
-        .setAnomalies(existing)
         .setLoader(mockLoader);
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to