This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 86fe034  [TE] detection - configure cache look back from yaml (#4028)
86fe034 is described below

commit 86fe034738dd881214987359f5f1825ba834573a
Author: Jihao Zhang <[email protected]>
AuthorDate: Thu Mar 28 16:48:35 2019 -0700

    [TE] detection - configure cache look back from yaml (#4028)
    
    Add the ability to configure cache look back from YAML.
---
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 25 ++++++++++++----------
 .../yaml/CompositePipelineConfigTranslator.java    |  8 +++++--
 .../detection/components/MockBaselineProvider.java |  3 +--
 .../components/RuleBaselineProviderTest.java       |  2 +-
 .../detection/spec/MockBaselineProviderSpec.java   | 10 ++++-----
 .../wrapper/BaselineFillingMergeWrapperTest.java   |  8 ++++---
 6 files changed, 32 insertions(+), 24 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 589e987..09a64bb 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -72,14 +72,14 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
   private static final String PROP_DETECTOR_COMPONENT_NAME = 
"detectorComponentName";
   private static final String PROP_TIMEZONE = "timezone";
   private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+  private static final String PROP_CACHE_PERIOD_LOOKBACK = 
"cachingPeriodLookback";
   private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = 
TimeUnit.DAYS.toMillis(30);
   private static final long CACHING_PERIOD_LOOKBACK_DAILY = 
TimeUnit.DAYS.toMillis(90);
   private static final long CACHING_PERIOD_LOOKBACK_HOURLY = 
TimeUnit.DAYS.toMillis(60);
   // disable minute level cache warm up
-  private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = 
TimeUnit.DAYS.toMillis(0);
+  private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
 
-  private static final Logger LOG = LoggerFactory.getLogger(
-      AnomalyDetectorWrapper.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
 
   private final String metricUrn;
   private final AnomalyDetector anomalyDetector;
@@ -98,7 +98,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline 
{
   private final DatasetConfigDTO dataset;
   private final DateTimeZone dateTimeZone;
   private final Period bucketPeriod;
-
+  private final long cachingPeriodLookback;
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO 
config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
@@ -134,6 +134,8 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
 
     String bucketStr = MapUtils.getString(config.getProperties(), 
PROP_BUCKET_PERIOD);
     this.bucketPeriod = bucketStr == null ? 
this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
+    this.cachingPeriodLookback = 
config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ?
+        MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) : 
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
   }
 
   @Override
@@ -142,8 +144,7 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
     // 1. get the last time stamp for the time series.
     // 2. to calculate current values and  baseline values for the anomalies 
detected
     // 3. anomaly detection current and baseline time series value
-    long cachingPeriodLookback = 
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
-    if (cachingPeriodLookback > 0) {
+    if( this.cachingPeriodLookback >= 0) {
       MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), 
startTime - cachingPeriodLookback, endTime,
           this.metricEntity.getFilters());
       this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
@@ -178,7 +179,7 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
   // guess-timate next time stamp
   // there are two cases. If the data is complete, next detection starts from 
the end time of this detection
   // If data is incomplete, next detection starts from the latest available 
data's time stamp plus the one time granularity.
-  long getLastTimeStamp(){
+  long getLastTimeStamp() {
     long end = this.endTime;
     if (this.dataset != null) {
       MetricSlice metricSlice = MetricSlice.from(this.metricEntity.getId(),
@@ -216,9 +217,12 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
           LOG.info("Will run detection in window {}", window);
         }
         // pre cache the time series for the whole detection time period 
instead of fetching for each window
-        long cachingPeriodLookback = 
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
-        MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), 
startTime - cachingPeriodLookback, endTime, this.metricEntity.getFilters(), 
toTimeGranularity(this.bucketPeriod));
-        this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
+        if (this.cachingPeriodLookback >= 0) {
+          MetricSlice cacheSlice =
+              MetricSlice.from(this.metricEntity.getId(), startTime - 
cachingPeriodLookback, endTime,
+                  this.metricEntity.getFilters(), 
toTimeGranularity(this.bucketPeriod));
+          this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
+        }
         return monitoringWindows;
       } catch (Exception e) {
         LOG.info("can't generate moving monitoring windows, calling with 
single detection window", e);
@@ -327,5 +331,4 @@ public class AnomalyDetectorWrapper extends 
DetectionPipeline {
       return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
     }
   }
-
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 94b3443..9124da3 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -164,6 +164,7 @@ public class CompositePipelineConfigTranslator extends 
YamlDetectionConfigTransl
   private static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = 
"RULE_BASELINE";
   private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
   private static final String PROP_MAX_DURATION = "maxDuration";
+  private static final String PROP_CACHE_PERIOD_LOOKBACK = 
"cachingPeriodLookback";
 
   private static final DetectionRegistry DETECTION_REGISTRY = 
DetectionRegistry.getInstance();
   static {
@@ -266,7 +267,7 @@ public class CompositePipelineConfigTranslator extends 
YamlDetectionConfigTransl
     nestedProperties.put(PROP_CLASS_NAME, 
AnomalyDetectorWrapper.class.getName());
     String detectorKey = makeComponentKey(detectorType, name);
 
-    fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
+    fillInDetectorWrapperProperties(nestedProperties, yamlConfig, 
detectorType);
 
     buildComponentSpec(yamlConfig, detectorType, detectorKey);
 
@@ -286,7 +287,7 @@ public class CompositePipelineConfigTranslator extends 
YamlDetectionConfigTransl
   }
 
   // fill in window size and unit if detector requires this
-  private void fillInWindowSizeAndUnit(Map<String, Object> properties, 
Map<String, Object> yamlConfig, String detectorType) {
+  private void fillInDetectorWrapperProperties(Map<String, Object> properties, 
Map<String, Object> yamlConfig, String detectorType) {
     if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
       properties.put(PROP_MOVING_WINDOW_DETECTION, true);
       switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
@@ -329,6 +330,9 @@ public class CompositePipelineConfigTranslator extends 
YamlDetectionConfigTransl
       if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
         properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, 
PROP_BUCKET_PERIOD));
       }
+      if (yamlConfig.containsKey(PROP_CACHE_PERIOD_LOOKBACK)) {
+        properties.put(PROP_CACHE_PERIOD_LOOKBACK, 
MapUtils.getString(yamlConfig, PROP_CACHE_PERIOD_LOOKBACK));
+      }
     }
   }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
index 6f29fd3..9acc924 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
@@ -18,7 +18,6 @@ package org.apache.pinot.thirdeye.detection.components;
 
 import org.apache.pinot.thirdeye.dataframe.Series;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
 import org.apache.pinot.thirdeye.detection.InputDataFetcher;
 import org.apache.pinot.thirdeye.detection.spec.MockBaselineProviderSpec;
 import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
@@ -40,6 +39,6 @@ public class MockBaselineProvider implements 
BaselineProvider<MockBaselineProvid
 
   @Override
   public Double computePredictedAggregates(MetricSlice slice, 
Series.DoubleFunction aggregateFunction) {
-    return this.mockSpec.getAggregates().get(slice);
+    return this.mockSpec.getBaselineAggregates().get(slice);
   }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
index 523b861..0b3917f 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
@@ -49,7 +49,7 @@ public class RuleBaselineProviderTest {
     MetricSlice slice2Wow = MetricSlice.from(1L, 1537920000000L, 
1538006400000L);
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
     aggregates.put(slice1Wow, DataFrame.builder(COL_TIME + ":LONG", COL_VALUE 
+ ":DOUBLE")
-        .append(-1, 100)
+        .append(-1, 150)
         .build()
         .setIndex(COL_TIME));
     aggregates.put(slice2Wow, DataFrame.builder(COL_TIME + ":LONG", COL_VALUE 
+ ":DOUBLE")
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
index 49f8a09..db100d2 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 public class MockBaselineProviderSpec extends AbstractSpec {
   private Map<MetricSlice, TimeSeries> baselineTimeseries;
-  private Map<MetricSlice, Double> aggregates;
+  private Map<MetricSlice, Double> baselineAggregates;
 
   public Map<MetricSlice, TimeSeries> getBaselineTimeseries() {
     return baselineTimeseries;
@@ -33,11 +33,11 @@ public class MockBaselineProviderSpec extends AbstractSpec {
     this.baselineTimeseries = baselineTimeseries;
   }
 
-  public Map<MetricSlice, Double> getAggregates() {
-    return aggregates;
+  public Map<MetricSlice, Double> getBaselineAggregates() {
+    return baselineAggregates;
   }
 
-  public void setAggregates(Map<MetricSlice, Double> aggregates) {
-    this.aggregates = aggregates;
+  public void setBaselineAggregates(Map<MetricSlice, Double> 
baselineAggregates) {
+    this.baselineAggregates = baselineAggregates;
   }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 77d1785..0b4c0d8 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -111,7 +111,9 @@ public class BaselineFillingMergeWrapperTest {
         Collections.singletonList(new 
MockPipelineOutput(Collections.singletonList(anomaly), -1L))))
         .setAnomalies(Collections.emptyList())
         .setMetrics(Collections.singletonList(metric))
-        .setTimeseries(timeseries);
+        .setTimeseries(timeseries)
+        .setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600),
+            DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + 
":DOUBLE").append(3000, 100).build()));
 
     // set up detection config properties
     this.config.getProperties().put(PROP_MAX_GAP, 100);
@@ -121,7 +123,7 @@ public class BaselineFillingMergeWrapperTest {
     // initialize the baseline provider
     BaselineProvider baselineProvider = new MockBaselineProvider();
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
-    spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 
100.0));
+    spec.setBaselineAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 
3600), 100.0));
     spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 
3600), TimeSeries.fromDataFrame(
         DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + 
":DOUBLE").append(3000, 100).build())));
     InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, 
this.config.getId());
@@ -161,7 +163,7 @@ public class BaselineFillingMergeWrapperTest {
     this.config.getProperties().put("detector", "$testDetector");
     BaselineProvider baselineProvider = new MockBaselineProvider();
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
-    spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 
100.0));
+    spec.setBaselineAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 
3600), 100.0));
     spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 
3600), TimeSeries.fromDataFrame(
         DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + 
":DOUBLE").append(3000, 100).build())));
     InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, 
this.config.getId());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to