apucher closed pull request #3570: [TE] detection - filter out child anomaly in
anomaly fetching
URL: https://github.com/apache/incubator-pinot/pull/3570
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index f600c44212..c5f6ec362c 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -17,6 +17,7 @@
package com.linkedin.thirdeye.detection;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
import com.google.common.collect.Multimap;
import com.linkedin.thirdeye.dataframe.DataFrame;
import com.linkedin.thirdeye.dataframe.util.MetricSlice;
@@ -148,7 +149,7 @@ public DataFrame call() throws Exception {
if (predicates.isEmpty()) throw new IllegalArgumentException("Must
provide at least one of start, end, or " + functionIdKey);
- List<MergedAnomalyResultDTO> anomalies =
this.anomalyDAO.findByPredicate(AND(predicates));
+ Collection<MergedAnomalyResultDTO> anomalies =
this.anomalyDAO.findByPredicate(AND(predicates));
anomalies.removeIf(anomaly -> !slice.match(anomaly));
if (isLegacy) {
@@ -160,12 +161,12 @@ public DataFrame call() throws Exception {
(configId >= 0) && (anomaly.getDetectionConfigId() == null ||
anomaly.getDetectionConfigId() != configId)
);
}
+ // filter all child anomalies. those are kept in the parent anomaly
children set.
+ anomalies = Collections2.filter(anomalies, mergedAnomaly ->
mergedAnomaly != null && !mergedAnomaly.isChild());
- LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {})
with confid Id = {}", anomalies.size(),
- slice.getStart(), slice.getEnd(), configId);
+ LOG.info("Fetched {} anomalies between (startTime = {}, endTime = {})
with confid Id = {}", anomalies.size(), slice.getStart(), slice.getEnd(),
configId);
output.putAll(slice, anomalies);
}
-
return output;
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
index 3b15748f40..d4035aaf8e 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionMigrationResource.java
@@ -16,6 +16,7 @@
package com.linkedin.thirdeye.detection;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import
com.linkedin.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
import com.linkedin.thirdeye.datalayer.bao.AnomalyFunctionManager;
@@ -94,8 +95,9 @@ public DetectionMigrationResource(MetricConfigManager
metricConfigDAO, AnomalyFu
@GET
public String migrateToYaml(@QueryParam("id") long anomalyFunctionId) throws
Exception {
AnomalyFunctionDTO anomalyFunctionDTO =
this.anomalyFunctionDAO.findById(anomalyFunctionId);
+ Preconditions.checkArgument(anomalyFunctionDTO.getIsActive(), "try to
migrate inactive anomaly function");
Map<String, Object> yamlConfigs = new LinkedHashMap<>();
- yamlConfigs.put("detectionName", anomalyFunctionDTO.getFunctionName());
+ yamlConfigs.put("detectionName", "new_pipeline_" +
anomalyFunctionDTO.getFunctionName());
yamlConfigs.put("metric", anomalyFunctionDTO.getMetric());
yamlConfigs.put("dataset", anomalyFunctionDTO.getCollection());
yamlConfigs.put("pipelineType", "Composite");
@@ -192,8 +194,8 @@ public String migrateToYaml(@QueryParam("id") long
anomalyFunctionId) throws Exc
Map<String, Object> params = new HashMap<>();
filterYamlParams.put("configuration", params);
params.putAll(functionDTO.getAlertFilter());
- params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
- params.put("variables.timeZone", getTimezone(functionDTO));
+ params.put("bucketPeriod", getBucketPeriod(functionDTO));
+ params.put("timeZone", getTimezone(functionDTO));
return filterYamlParams;
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
index e6dcad4d39..a51a3a62c7 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionResource.java
@@ -41,6 +41,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@@ -49,6 +51,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.collections.MapUtils;
+import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -155,7 +158,7 @@ public Response detectionPreview(
DetectionPipelineResult result = pipeline.run();
if (diagnostics == null || !diagnostics) {
- result.setDiagnostics(Collections.<String, Object>emptyMap());
+ result.setDiagnostics(Collections.emptyMap());
}
return Response.ok(result).build();
@@ -163,27 +166,28 @@ public Response detectionPreview(
@POST
@Path("/replay/{id}")
- public Response detectionReplay(
- @PathParam("id") long configId,
- @QueryParam("start") long start,
- @QueryParam("end") long end) throws Exception {
+ public Response detectionReplay(@PathParam("id") long configId,
@QueryParam("start") long start,
+ @QueryParam("end") long end,
+ @QueryParam("deleteExistingAnomaly") @DefaultValue("false") boolean
deleteExistingAnomaly) throws Exception {
DetectionConfigDTO config = this.configDAO.findById(configId);
if (config == null) {
throw new IllegalArgumentException(String.format("Cannot find config
%d", configId));
}
- // clear existing anomalies
AnomalySlice slice = new AnomalySlice().withStart(start).withEnd(end);
- Collection<MergedAnomalyResultDTO> existing =
this.provider.fetchAnomalies(Collections.singleton(slice), configId).get(slice);
-
- List<Long> existingIds = new ArrayList<>();
- for (MergedAnomalyResultDTO anomaly : existing) {
- existingIds.add(anomaly.getId());
+ if (deleteExistingAnomaly) {
+ // clear existing anomalies
+ Collection<MergedAnomalyResultDTO> existing =
+ this.provider.fetchAnomalies(Collections.singleton(slice),
configId).get(slice);
+
+ List<Long> existingIds = new ArrayList<>();
+ for (MergedAnomalyResultDTO anomaly : existing) {
+ existingIds.add(anomaly.getId());
+ }
+ this.anomalyDAO.deleteByIds(existingIds);
}
- this.anomalyDAO.deleteByIds(existingIds);
-
// execute replay
DetectionPipeline pipeline = this.loader.from(this.provider, config,
start, end);
DetectionPipelineResult result = pipeline.run();
@@ -200,6 +204,7 @@ public Response detectionReplay(
this.anomalyDAO.save(anomaly);
}
- return Response.ok(result).build();
- }
+ LOG.info("replay detection pipeline {} generated {} anomalies.",
config.getId(), result.getAnomalies().size());
+ return Response.ok(result.getAnomalies()).build();
}
+}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
index 144bf5ff1a..43f9c27cea 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/algorithm/MergeWrapper.java
@@ -30,9 +30,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.commons.collections.MapUtils;
@@ -44,6 +46,7 @@
private static final String PROP_NESTED = "nested";
private static final String PROP_CLASS_NAME = "className";
private static final String PROP_MERGE_KEY = "mergeKey";
+ private static final String PROP_DETECTOR_COMPONENT_KEY =
"detectorComponentKey";
protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new
Comparator<MergedAnomalyResultDTO>() {
@Override
@@ -115,22 +118,23 @@ public DetectionPipelineResult run() throws Exception {
i++;
}
- // retrieve anomalies
- AnomalySlice effectiveSlice = this.slice
- .withStart(this.getStartTime(generated) - this.maxGap - 1)
- .withEnd(this.getEndTime(generated) + this.maxGap + 1);
-
- List<MergedAnomalyResultDTO> retrieved = new ArrayList<>();
-
retrieved.addAll(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice),
this.config.getId()).get(effectiveSlice));
-
// merge
- List<MergedAnomalyResultDTO> all = new ArrayList<>();
- all.addAll(retrieved);
+ Set<MergedAnomalyResultDTO> all = new HashSet<>();
+ all.addAll(retrieveAnomaliesFromDatabase(generated));
all.addAll(generated);
return new
DetectionPipelineResult(this.merge(all)).setDiagnostics(diagnostics);
}
+ protected List<MergedAnomalyResultDTO>
retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ AnomalySlice effectiveSlice = this.slice
+ .withStart(this.getStartTime(generated) - this.maxGap - 1)
+ .withEnd(this.getEndTime(generated) + this.maxGap + 1);
+
+ return new
ArrayList<>(this.provider.fetchAnomalies(Collections.singleton(effectiveSlice),
this.config.getId()).get(effectiveSlice));
+ }
+
+ // logic to do time-based merging.
protected List<MergedAnomalyResultDTO>
merge(Collection<MergedAnomalyResultDTO> anomalies) {
List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
Collections.sort(input, COMPARATOR);
@@ -198,16 +202,19 @@ private long getEndTime(Iterable<MergedAnomalyResultDTO>
anomalies) {
final String collection;
final DimensionMap dimensions;
final String mergeKey;
+ final String componentKey;
- public AnomalyKey(String metric, String collection, DimensionMap
dimensions, String mergeKey) {
+ public AnomalyKey(String metric, String collection, DimensionMap
dimensions, String mergeKey, String componentKey) {
this.metric = metric;
this.collection = collection;
this.dimensions = dimensions;
this.mergeKey = mergeKey;
+ this.componentKey = componentKey;
}
public static AnomalyKey from(MergedAnomalyResultDTO anomaly) {
- return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(),
anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY));
+ return new AnomalyKey(anomaly.getMetric(), anomaly.getCollection(),
anomaly.getDimensions(), anomaly.getProperties().get(PROP_MERGE_KEY),
+ anomaly.getProperties().get(PROP_DETECTOR_COMPONENT_KEY));
}
@Override
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index d743971439..64ffc47bb8 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -110,7 +110,7 @@ public BaselineFillingMergeWrapper(DataProvider provider,
DetectionConfigDTO con
}
} catch (Exception e) {
// ignore
- LOG.warn("cannot get metric slice for anomaly {}", anomaly, e);
+ LOG.warn("cannot get current or baseline value for anomaly {}. ",
anomaly, e);
}
}
return mergedAnomalies;
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index 77536f9a55..1382d9ef7a 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -20,6 +20,7 @@
import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.algorithm.MergeWrapper;
+import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -40,6 +41,12 @@ public ChildKeepingMergeWrapper(DataProvider provider,
DetectionConfigDTO config
super(provider, config, startTime, endTime);
}
+ @Override
+ // does not fetch any anomalies from database
+ protected List<MergedAnomalyResultDTO>
retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
+ return Collections.emptyList();
+ }
+
@Override
protected List<MergedAnomalyResultDTO>
merge(Collection<MergedAnomalyResultDTO> anomalies) {
List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
@@ -54,7 +61,7 @@ public ChildKeepingMergeWrapper(DataProvider provider,
DetectionConfigDTO config
}
MergeWrapper.AnomalyKey
- key = new MergeWrapper.AnomalyKey(anomaly.getMetric(),
anomaly.getCollection(), anomaly.getDimensions(), "");
+ key = new MergeWrapper.AnomalyKey(anomaly.getMetric(),
anomaly.getCollection(), anomaly.getDimensions(), "", "");
MergedAnomalyResultDTO parent = parents.get(key);
if (parent == null || anomaly.getStartTime() - parent.getEndTime() >
this.maxGap) {
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 4162940422..4a75289d6d 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -18,14 +18,12 @@
import com.google.common.collect.ImmutableSet;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
-import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import com.linkedin.thirdeye.detection.DataProvider;
import com.linkedin.thirdeye.detection.DetectionPipelineResult;
import com.linkedin.thirdeye.detection.MockDataProvider;
import com.linkedin.thirdeye.detection.MockPipeline;
import com.linkedin.thirdeye.detection.MockPipelineLoader;
import com.linkedin.thirdeye.detection.MockPipelineOutput;
-import com.linkedin.thirdeye.detection.wrapper.ChildKeepingMergeWrapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -86,26 +84,23 @@ public void beforeMethod() {
this.config.setName(PROP_NAME_VALUE);
this.config.setProperties(this.properties);
- List<MergedAnomalyResultDTO> existing = new ArrayList<>();
- existing.add(makeAnomaly(0, 1000));
- existing.add(makeAnomaly(1500, 2000));
-
this.outputs = new ArrayList<>();
this.outputs.add(new MockPipelineOutput(Arrays.asList(
makeAnomaly(1100, 1200),
- makeAnomaly(2200, 2300)
+ makeAnomaly(2200, 2300),
+ makeAnomaly(0, 1000)
), 2900));
this.outputs.add(new MockPipelineOutput(Arrays.asList(
makeAnomaly(1150, 1250),
- makeAnomaly(2400, 2800)
+ makeAnomaly(2400, 2800),
+ makeAnomaly(1500, 2000)
), 3000));
MockPipelineLoader mockLoader = new MockPipelineLoader(this.runs,
this.outputs);
this.provider = new MockDataProvider()
- .setAnomalies(existing)
.setLoader(mockLoader);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]