This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch speed_up_minute_level_detection
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d0cb6f3856f7a944c86cf3f8cc671e90658a07bf
Author: Xiaohui Sun <[email protected]>
AuthorDate: Mon Apr 1 23:02:41 2019 -0700

    [TE] Speed up minute level detection
---
 .../pinot/thirdeye/detection/alert/AlertUtils.java |  1 -
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 46 ++++++++++++++--------
 2 files changed, 29 insertions(+), 18 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
index 8e6adf3..0ec9e67 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/AlertUtils.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
 import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import com.mysql.jdbc.StringUtils;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index b888379..71e17f0 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -81,8 +81,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline 
{
   private static final long CACHING_PERIOD_LOOKBACK_HOURLY = 
TimeUnit.DAYS.toMillis(60);
   // disable minute level cache warm up
   private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
-  // fail detection job if it failed successively for the first 10 windows
-  private static final long EARLY_TERMINATE_WINDOW = 10;
+  // fail detection job if it failed successively for the first 3 windows
+  private static final long EARLY_TERMINATE_WINDOW = 3;
 
 
   private static final Logger LOG = 
LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
@@ -92,18 +92,18 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
 
   private final int windowDelay;
   private final TimeUnit windowDelayUnit;
-  private final int windowSize;
-  private final TimeUnit windowUnit;
+  private int windowSize;
+  private TimeUnit windowUnit;
   private final MetricConfigDTO metric;
   private final MetricEntity metricEntity;
   private final boolean isMovingWindowDetection;
   // need to specify run frequency for minute level detection. Used for moving 
monitoring window alignment, default to be 15 minutes.
   private final TimeGranularity functionFrequency;
   private final String detectorName;
-  private final long windowSizeMillis;
+  private long windowSizeMillis;
   private final DatasetConfigDTO dataset;
   private final DateTimeZone dateTimeZone;
-  private final Period bucketPeriod;
+  private Period bucketPeriod;
   private final long cachingPeriodLookback;
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO 
config, long startTime, long endTime) {
@@ -142,6 +142,8 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
     this.bucketPeriod = bucketStr == null ? 
this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
     this.cachingPeriodLookback = 
config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ?
         MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) : 
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
+
+    speedUpMinuteLevelDetection();
   }
 
   @Override
@@ -168,12 +170,13 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
       Interval window = monitoringWindows.get(i);
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
-        LOG.info("[Pipeline] running detection for config {} metricUrn {} 
window ({}/{}) - start {} end {}",
-            config.getId(), metricUrn, i, monitoringWindows.size(), 
window.getStart(), window.getEnd());
+        LOG.info("[Pipeline] start detection for config {} metricUrn {} window 
({}/{}) - start {} end {}",
+            config.getId(), metricUrn, i + 1, monitoringWindows.size(), 
window.getStart(), window.getEnd());
         long ts = System.currentTimeMillis();
         anomaliesForOneWindow = anomalyDetector.runDetection(window, 
this.metricUrn);
-        LOG.info("[Pipeline] run anomaly detection for window ({}/{}) - start 
{} end {}  used {} milliseconds, detected {} anomalies",
-            i, monitoringWindows.size(), window.getStart(), window.getEnd(), 
System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
+        LOG.info("[Pipeline] end detection for config {} metricUrn {} window 
({}/{}) - start {} end {} used {} milliseconds, detected {} anomalies",
+            config.getId(), metricUrn, i + 1, monitoringWindows.size(), 
window.getStart(), window.getEnd(),
+            System.currentTimeMillis() - ts, anomaliesForOneWindow.size());
         successWindows++;
       }
       catch (DetectorDataInsufficientException e) {
@@ -256,13 +259,6 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
         for (Interval window : monitoringWindows){
           LOG.info("Will run detection in window {}", window);
         }
-        // pre cache the time series for the whole detection time period 
instead of fetching for each window
-        if (this.cachingPeriodLookback >= 0) {
-          MetricSlice cacheSlice =
-              MetricSlice.from(this.metricEntity.getId(), startTime - 
cachingPeriodLookback, endTime,
-                  this.metricEntity.getFilters(), 
toTimeGranularity(this.bucketPeriod));
-          this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
-        }
         return monitoringWindows;
       } catch (Exception e) {
         LOG.info("can't generate moving monitoring windows, calling with 
single detection window", e);
@@ -371,4 +367,20 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
       return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
     }
   }
+
+  // speed up minute level if detection window is larger than 1 hour
+  private void speedUpMinuteLevelDetection() {
+    if (bucketPeriod.getMinutes() <= 5) {
+      if (endTime - startTime >= Period.days(1).getMillis()) {
+        bucketPeriod = Period.days(1);
+        windowSize = 1;
+        windowUnit = TimeUnit.DAYS;
+      } else if (endTime - startTime >= Period.hours(1).getMillis()) {
+        bucketPeriod = Period.hours(1);
+        windowSize = 1;
+        windowUnit = TimeUnit.HOURS;
+      }
+      windowSizeMillis = TimeUnit.MILLISECONDS.convert(windowSize, windowUnit);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to