This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch add_early_terminate_detection in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 28b63210581908cb734b76112dea72fb219c4c11 Author: Xiaohui Sun <[email protected]> AuthorDate: Mon Apr 1 10:20:56 2019 -0700 [TE] Add early terminate in detection loop --- .../detection/wrapper/AnomalyDetectorWrapper.java | 32 ++++++++++++++++++---- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java index f8ebe7c..b888379 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java @@ -76,11 +76,14 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { private static final String PROP_TIMEZONE = "timezone"; private static final String PROP_BUCKET_PERIOD = "bucketPeriod"; private static final String PROP_CACHE_PERIOD_LOOKBACK = "cachingPeriodLookback"; - private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(0); + private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(-1); private static final long CACHING_PERIOD_LOOKBACK_DAILY = TimeUnit.DAYS.toMillis(90); private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60); // disable minute level cache warm up private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1; + // fail detection job if it failed successively for the first 10 windows + private static final long EARLY_TERMINATE_WINDOW = 10; + private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class); @@ -155,8 +158,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { List<Interval> monitoringWindows = this.getMonitoringWindows(); List<MergedAnomalyResultDTO> anomalies = new ArrayList<>(); + int totalWindows = monitoringWindows.size(); int successWindows = 0; - for (int i = 0; i < monitoringWindows.size(); i++) { + Exception lastException = null; + for (int i = 0; i < totalWindows; i++) { + earlyTerminate(i, successWindows, totalWindows, lastException); + + // run detection Interval window = monitoringWindows.get(i); List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>(); try { @@ -169,16 +177,20 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { successWindows++; } catch (DetectorDataInsufficientException e) { + lastException = e; LOG.warn("[DetectionConfigID{}] Insufficient data ro run detection for window {} to {}.", this.config.getId(), window.getStart(), window.getEnd()); } catch (Exception e) { + lastException = e; LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e); } anomalies.addAll(anomaliesForOneWindow); } - if (successWindows == 0 && monitoringWindows.size() > 0) { - LOG.error("All {} detection windows failed for config {} metricUrn {}.", monitoringWindows.size(), config.getId(), metricUrn); - throw new DetectorException("All detection windows failed."); + + // throw exception if all windows failed + if (successWindows == 0 && totalWindows > 0) { + LOG.error("All {} detection windows failed for config {} metricUrn {}.", totalWindows, config.getId(), metricUrn); + throw new DetectorException("All " + totalWindows + " detection windows failed.", lastException); } for (MergedAnomalyResultDTO anomaly : anomalies) { @@ -194,6 +206,16 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { Collectors.toList()), lastTimeStamp); } + private void earlyTerminate(int currentWindows, int successWindows, int totalWindows, Exception lastException) + throws DetectorException { + // early termination if first of the EARLY_TERMINATE_WINDOW all failed + if (currentWindows == EARLY_TERMINATE_WINDOW && successWindows == 0) { + LOG.error("Successive first {} detection windows failed for config {} metricUrn {}.", EARLY_TERMINATE_WINDOW, config.getId(), metricUrn); + throw new DetectorException(String.format("Successive first %d/%d detection windows failed.", EARLY_TERMINATE_WINDOW, totalWindows), + lastException); + } + } + // guess-timate next time stamp // there are two cases. If the data is complete, next detection starts from the end time of this detection // If data is incomplete, next detection starts from the latest available data's time stamp plus the one time granularity. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
