This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 16c46f3 [TE] catch exceptions in dimension exploration (#4136)
16c46f3 is described below
commit 16c46f3d283e4dae95250841d4b78e71e4988cb2
Author: Jihao Zhang <[email protected]>
AuthorDate: Tue Apr 23 11:06:44 2019 -0700
[TE] catch exceptions in dimension exploration (#4136)
Currently, the exception in one dimension exploration failed the whole
task. The detection should continue to run and save the detected anomalies in
other dimensions instead of failing the whole task. Especially when the task is
big(exploring thousands of dimensions), the results for other dimensions are
wasted and the later catching up task will be more expensive. This PR
implements this behavior.
---
.../detection/DetectionPipelineException.java | 22 ++++++++++
.../detection/algorithm/DimensionWrapper.java | 48 ++++++++++++++++++----
.../detection/wrapper/AnomalyDetectorWrapper.java | 27 +++++++++---
3 files changed, 84 insertions(+), 13 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineException.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineException.java
new file mode 100644
index 0000000..c65efa6
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineException.java
@@ -0,0 +1,22 @@
+package org.apache.pinot.thirdeye.detection;
+
+/**
+ * The exception to be thrown in detection pipelines.
+ */
+public class DetectionPipelineException extends Exception {
+ public DetectionPipelineException(Throwable cause) {
+ super(cause);
+ }
+
+ public DetectionPipelineException() {
+ super();
+ }
+
+ public DetectionPipelineException(String message) {
+ super(message);
+ }
+
+ public DetectionPipelineException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index 09c36d4..8950d52 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -30,6 +30,7 @@ import
org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.detection.ConfigUtils;
import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineException;
import org.apache.pinot.thirdeye.detection.DetectionPipeline;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
@@ -69,6 +70,7 @@ public class DimensionWrapper extends DetectionPipeline {
private static final String PROP_NESTED_METRIC_URNS = "nestedMetricUrns";
private static final String PROP_CLASS_NAME = "className";
+ private static final int EARLY_STOP_THRESHOLD = 10;
private final String metricUrn;
private final int k;
@@ -198,21 +200,51 @@ public class DimensionWrapper extends DetectionPipeline {
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
Map<String, Object> diagnostics = new HashMap<>();
Set<Long> lastTimeStamps = new HashSet<>();
- LOG.info("exploring {} metrics", nestedMetrics.size());
- for (MetricEntity metric : nestedMetrics) {
- for (Map<String, Object> properties : this.nestedProperties) {
- LOG.info("running detection for {}", metric.toString());
- DetectionPipelineResult intermediate = this.runNested(metric,
properties);
- lastTimeStamps.add(intermediate.getLastTimestamp());
- anomalies.addAll(intermediate.getAnomalies());
- diagnostics.put(metric.getUrn(), intermediate.getDiagnostics());
+ long totalNestedMetrics = nestedMetrics.size();
+ long successNestedMetrics = 0; // record the number of successfully
explored dimensions
+ LOG.info("exploring {} metrics", totalNestedMetrics);
+ for (int i = 0; i < totalNestedMetrics; i++) {
+ checkEarlyStop(totalNestedMetrics, successNestedMetrics, i);
+ MetricEntity metric = nestedMetrics.get(i);
+ try {
+ LOG.info("running detection for metric urn {}. {}/{}",
metric.getUrn(), i + 1, totalNestedMetrics);
+ for (Map<String, Object> properties : this.nestedProperties) {
+ DetectionPipelineResult intermediate = this.runNested(metric,
properties);
+ lastTimeStamps.add(intermediate.getLastTimestamp());
+ anomalies.addAll(intermediate.getAnomalies());
+ diagnostics.put(metric.getUrn(), intermediate.getDiagnostics());
+ }
+ successNestedMetrics++;
+ } catch (Exception e) {
+ LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to
{} failed for metric urn {}.",
+ this.config.getId(), this.start, this.end, metric.getUrn(), e);
}
}
+ checkDimensionExploreStatus(totalNestedMetrics, successNestedMetrics);
return new DetectionPipelineResult(anomalies,
DetectionUtils.consolidateNestedLastTimeStamps(lastTimeStamps))
.setDiagnostics(diagnostics);
}
+ private void checkEarlyStop(long totalNestedMetrics, long
successNestedMetrics, int i) throws DetectionPipelineException {
+ // if the first certain number of dimensions all failed, throw an exception
+ if (i == EARLY_STOP_THRESHOLD && successNestedMetrics == 0) {
+ throw new DetectionPipelineException(String.format(
+ "Detection failed for first %d out of %d metric dimensions for
monitoring window %d to %d, stop dimension explore.",
+ i, totalNestedMetrics, this.getStartTime(), this.getEndTime()));
+ }
+ }
+
+ private void checkDimensionExploreStatus(long totalNestedMetrics, long
successNestedMetrics)
+ throws DetectionPipelineException {
+ // if all dimension explore failed, throw an exception
+ if (successNestedMetrics == 0 && totalNestedMetrics > 0) {
+ throw new DetectionPipelineException(String.format(
+ "Detection failed for all nested dimensions for detection config id
%d for monitoring window %d to %d, stop dimension explore.",
+ this.config.getId(), this.getStartTime(), this.getEndTime()));
+ }
+ }
+
private boolean checkMinLiveZone(MetricEntity me) {
MetricSlice slice = MetricSlice.from(me.getId(), this.start.getMillis(),
this.end.getMillis(), me.getFilters());
DataFrame df =
this.provider.fetchTimeseries(Collections.singleton(slice)).get(slice);
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index bfd8365..3d36eb1 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineException;
import org.apache.pinot.thirdeye.detection.DetectionPipeline;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
@@ -160,11 +161,7 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
int totalWindows = monitoringWindows.size();
int successWindows = 0;
for (int i = 0; i < totalWindows; i++) {
- if (i == EARLY_TERMINATE_WINDOW && successWindows == 0) {
- LOG.error("Successive first {}/{} detection windows failed for config
{} metricUrn {}. Discard remaining windows",
- EARLY_TERMINATE_WINDOW, totalWindows, config.getId(), metricUrn);
- break;
- }
+ checkEarlyStop(totalWindows, successWindows, i);
// run detection
Interval window = monitoringWindows.get(i);
@@ -188,6 +185,8 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
anomalies.addAll(anomaliesForOneWindow);
}
+ checkMovingWindowDetectionStatus(totalWindows, successWindows);
+
for (MergedAnomalyResultDTO anomaly : anomalies) {
anomaly.setDetectionConfigId(this.config.getId());
anomaly.setMetricUrn(this.metricUrn);
@@ -201,6 +200,24 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
Collectors.toList()), lastTimeStamp);
}
+ private void checkEarlyStop(int totalWindows, int successWindows, int i)
throws DetectionPipelineException {
+ // if the first certain number of windows all failed, throw an exception
+ if (i == EARLY_TERMINATE_WINDOW && successWindows == 0) {
+ throw new DetectionPipelineException(String.format(
+ "Successive first %d/%d detection windows failed for config %d
metricUrn %s for monitoring window %d to %d. Discard remaining windows",
+ EARLY_TERMINATE_WINDOW, totalWindows, config.getId(), metricUrn,
this.getStartTime(), this.getEndTime()));
+ }
+ }
+
+ private void checkMovingWindowDetectionStatus(int totalWindows, int
successWindows) throws DetectionPipelineException {
+ // if all moving window detection failed, throw an exception
+ if (successWindows == 0 && totalWindows > 0) {
+ throw new DetectionPipelineException(String.format(
+ "Detection failed for all windows for detection config id %d
detector %s for monitoring window %d to %d.",
+ this.config.getId(), this.detectorName, this.getStartTime(),
this.getEndTime()));
+ }
+ }
+
// guess-timate next time stamp
// there are two cases. If the data is complete, next detection starts from
the end time of this detection
// If data is incomplete, next detection starts from the latest available
data's time stamp plus the one time granularity.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]