This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch speed_up_minute_level_detection in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d0cb6f3856f7a944c86cf3f8cc671e90658a07bf Author: Xiaohui Sun <[email protected]> AuthorDate: Mon Apr 1 23:02:41 2019 -0700 [TE] Speed up minute level detection --- .../pinot/thirdeye/detection/alert/AlertUtils.java | 1 - .../detection/wrapper/AnomalyDetectorWrapper.java | 46 ++++++++++++++-------- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java index 8e6adf3..0ec9e67 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java @@ -26,7 +26,6 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; -import com.mysql.jdbc.StringUtils; import java.util.Collection; import java.util.Collections; import java.util.HashMap; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java index b888379..71e17f0 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java @@ -81,8 +81,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { private static final long CACHING_PERIOD_LOOKBACK_HOURLY = TimeUnit.DAYS.toMillis(60); // disable minute level cache warm up private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1; - // fail detection job if it failed successively for the first 10 windows - private static final long EARLY_TERMINATE_WINDOW = 10; + // fail detection job if it failed successively for the first 3 windows + private static final long EARLY_TERMINATE_WINDOW = 3; private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetectorWrapper.class); @@ -92,18 +92,18 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { private final int windowDelay; private final TimeUnit windowDelayUnit; - private final int windowSize; - private final TimeUnit windowUnit; + private int windowSize; + private TimeUnit windowUnit; private final MetricConfigDTO metric; private final MetricEntity metricEntity; private final boolean isMovingWindowDetection; // need to specify run frequency for minute level detection. Used for moving monitoring window alignment, default to be 15 minutes. private final TimeGranularity functionFrequency; private final String detectorName; - private final long windowSizeMillis; + private long windowSizeMillis; private final DatasetConfigDTO dataset; private final DateTimeZone dateTimeZone; - private final Period bucketPeriod; + private Period bucketPeriod; private final long cachingPeriodLookback; public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) { @@ -142,6 +142,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { this.bucketPeriod = bucketStr == null ? this.getBucketSizePeriodForDataset() : Period.parse(bucketStr); this.cachingPeriodLookback = config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ? MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) : getCachingPeriodLookback(this.dataset.bucketTimeGranularity()); + + speedUpMinuteLevelDetection(); } @Override @@ -168,12 +170,13 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { Interval window = monitoringWindows.get(i); List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>(); try { - LOG.info("[Pipeline] running detection for config {} metricUrn {} window ({}/{}) - start {} end {}", - config.getId(), metricUrn, i, monitoringWindows.size(), window.getStart(), window.getEnd()); + LOG.info("[Pipeline] start detection for config {} metricUrn {} window ({}/{}) - start {} end {}", + config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd()); long ts = System.currentTimeMillis(); anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn); - LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies", - i, monitoringWindows.size(), window.getStart(), window.getEnd(), System.currentTimeMillis() - ts, anomaliesForOneWindow.size()); + LOG.info("[Pipeline] end detection for config {} metricUrn {} window ({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies", + config.getId(), metricUrn, i + 1, monitoringWindows.size(), window.getStart(), window.getEnd(), + System.currentTimeMillis() - ts, anomaliesForOneWindow.size()); successWindows++; } catch (DetectorDataInsufficientException e) { @@ -256,13 +259,6 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { for (Interval window : monitoringWindows){ LOG.info("Will run detection in window {}", window); } - // pre cache the time series for the whole detection time period instead of fetching for each window - if (this.cachingPeriodLookback >= 0) { - MetricSlice cacheSlice = - MetricSlice.from(this.metricEntity.getId(), startTime - cachingPeriodLookback, endTime, - this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod)); - this.provider.fetchTimeseries(Collections.singleton(cacheSlice)); - } return monitoringWindows; } catch (Exception e) { LOG.info("can't generate moving monitoring windows, calling with single detection window", e); @@ -371,4 +367,20 @@ public class AnomalyDetectorWrapper extends DetectionPipeline { return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS); } } + + // speed up minute level if detection window is larger than 1 hour + private void speedUpMinuteLevelDetection() { + if (bucketPeriod.getMinutes() <= 5) { + if (endTime - startTime >= Period.days(1).getMillis()) { + bucketPeriod = Period.days(1); + windowSize = 1; + windowUnit = TimeUnit.DAYS; + } else if (endTime - startTime >= Period.hours(1).getMillis()) { + bucketPeriod = Period.hours(1); + windowSize = 1; + windowUnit = TimeUnit.HOURS; + } + windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
