This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 334b918 Fail pipeline all windows failed (#4032)
334b918 is described below
commit 334b918c2e7832776baf52e62bb48d4c806e6fe0
Author: Xiaohui Sun <[email protected]>
AuthorDate: Fri Mar 29 13:12:38 2019 -0700
Fail pipeline all windows failed (#4032)
* [TE] Reduce log for data insufficient exception.
* [TE] fail the pipeline if all detection windows failed
---
.../thirdeye/detection/algorithm/MergeWrapper.java | 5 ++++
.../detection/wrapper/AnomalyDetectorWrapper.java | 27 +++++++++++++++++-----
2 files changed, 26 insertions(+), 6 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index d4dabf1..83be36c 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -38,9 +38,12 @@ import
org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.detection.ConfigUtils;
import org.apache.pinot.thirdeye.detection.DataProvider;
import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineJob;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -53,6 +56,7 @@ public class MergeWrapper extends DetectionPipeline {
private static final String PROP_MERGE_KEY = "mergeKey";
private static final String PROP_DETECTOR_COMPONENT_NAME =
"detectorComponentName";
private static final int NUMBER_OF_SPLITED_ANOMALIES_LIMIT = 1000;
+ private static final Logger LOG =
LoggerFactory.getLogger(MergeWrapper.class);
protected static final Comparator<MergedAnomalyResultDTO> COMPARATOR = new
Comparator<MergedAnomalyResultDTO>() {
@Override
@@ -212,6 +216,7 @@ public class MergeWrapper extends DetectionPipeline {
int anomalyCountAfterSplit = (int) Math.ceil((anomaly.getEndTime() -
anomaly.getStartTime()) / (double) maxDuration);
if (anomalyCountAfterSplit > NUMBER_OF_SPLITED_ANOMALIES_LIMIT) {
// if the number of anomalies after split is more than the limit, don't
split
+ LOG.warn("Exceeded max number of split count. maxDuration = {}, anomaly
split count = {}", maxDuration, anomalyCountAfterSplit);
return Collections.singleton(anomaly);
}
Set<MergedAnomalyResultDTO> result = new HashSet<>();
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 09a64bb..e8b12db 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -41,6 +41,8 @@ import org.apache.pinot.thirdeye.detection.DetectionPipeline;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import
org.apache.pinot.thirdeye.detection.spi.exception.DetectorDataInsufficientException;
+import org.apache.pinot.thirdeye.detection.spi.exception.DetectorException;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
import org.joda.time.DateTime;
@@ -73,7 +75,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline
{
private static final String PROP_TIMEZONE = "timezone";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
private static final String PROP_CACHE_PERIOD_LOOKBACK =
"cachingPeriodLookback";
- private static final long DEFAULT_CACHING_PERIOD_LOOKBACK =
TimeUnit.DAYS.toMillis(30);
+ private static final long DEFAULT_CACHING_PERIOD_LOOKBACK =
TimeUnit.DAYS.toMillis(0);
private static final long CACHING_PERIOD_LOOKBACK_DAILY =
TimeUnit.DAYS.toMillis(90);
private static final long CACHING_PERIOD_LOOKBACK_HOURLY =
TimeUnit.DAYS.toMillis(60);
// disable minute level cache warm up
@@ -144,7 +146,7 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
// 1. get the last time stamp for the time series.
// 2. to calculate current values and baseline values for the anomalies
detected
// 3. anomaly detection current and baseline time series value
- if( this.cachingPeriodLookback >= 0) {
+ if( this.cachingPeriodLookback > 0) {
MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(),
startTime - cachingPeriodLookback, endTime,
this.metricEntity.getFilters());
this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
@@ -152,18 +154,31 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
List<Interval> monitoringWindows = this.getMonitoringWindows();
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
- for (Interval window : monitoringWindows) {
+ int successWindows = 0;
+ for (int i = 0; i < monitoringWindows.size(); i++) {
+ Interval window = monitoringWindows.get(i);
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
- LOG.info("[New Pipeline] running detection for config {} metricUrn {}.
start time {}, end time {}", config.getId(), metricUrn, window.getStart(),
window.getEnd());
+ LOG.info("[Pipeline] running detection for config {} metricUrn {}
window ({}/{}) - start {} end {}",
+ config.getId(), metricUrn, i, monitoringWindows.size(),
window.getStart(), window.getEnd());
long ts = System.currentTimeMillis();
anomaliesForOneWindow = anomalyDetector.runDetection(window,
this.metricUrn);
- LOG.info("[New Pipeline] run anomaly detection for window {} - {} used
{} milliseconds, detected {} anomalies", window.getStart(), window.getEnd(),
System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
- } catch (Exception e) {
+ LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start
{} end {} used {} milliseconds, detected {} anomalies",
+ i, monitoringWindows.size(), window.getStart(), window.getEnd(),
System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
+ successWindows++;
+ }
+ catch (DetectorDataInsufficientException e) {
+ LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for
window {} to {}.", this.config.getId(), window.getStart(), window.getEnd());
+ }
+ catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to
{} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
}
anomalies.addAll(anomaliesForOneWindow);
}
+ if (successWindows == 0 && monitoringWindows.size() > 0) {
+ LOG.error("All {} detection windows failed for config {} metricUrn {}.",
monitoringWindows.size(), config.getId(), metricUrn);
+ throw new DetectorException("All detection windows failed.");
+ }
for (MergedAnomalyResultDTO anomaly : anomalies) {
anomaly.setDetectionConfigId(this.config.getId());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]