This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 86fe034 [TE] detection - configure cache look back from yaml (#4028)
86fe034 is described below
commit 86fe034738dd881214987359f5f1825ba834573a
Author: Jihao Zhang <[email protected]>
AuthorDate: Thu Mar 28 16:48:35 2019 -0700
[TE] detection - configure cache look back from yaml (#4028)
Add the ability to configure cache look back from YAML.
---
.../detection/wrapper/AnomalyDetectorWrapper.java | 25 ++++++++++++----------
.../yaml/CompositePipelineConfigTranslator.java | 8 +++++--
.../detection/components/MockBaselineProvider.java | 3 +--
.../components/RuleBaselineProviderTest.java | 2 +-
.../detection/spec/MockBaselineProviderSpec.java | 10 ++++-----
.../wrapper/BaselineFillingMergeWrapperTest.java | 8 ++++---
6 files changed, 32 insertions(+), 24 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 589e987..09a64bb 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -72,14 +72,14 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
private static final String PROP_DETECTOR_COMPONENT_NAME =
"detectorComponentName";
private static final String PROP_TIMEZONE = "timezone";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+ private static final String PROP_CACHE_PERIOD_LOOKBACK =
"cachingPeriodLookback";
private static final long DEFAULT_CACHING_PERIOD_LOOKBACK =
TimeUnit.DAYS.toMillis(30);
private static final long CACHING_PERIOD_LOOKBACK_DAILY =
TimeUnit.DAYS.toMillis(90);
private static final long CACHING_PERIOD_LOOKBACK_HOURLY =
TimeUnit.DAYS.toMillis(60);
// disable minute level cache warm up
- private static final long CACHING_PERIOD_LOOKBACK_MINUTELY =
TimeUnit.DAYS.toMillis(0);
+ private static final long CACHING_PERIOD_LOOKBACK_MINUTELY = -1;
- private static final Logger LOG = LoggerFactory.getLogger(
- AnomalyDetectorWrapper.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AnomalyDetectorWrapper.class);
private final String metricUrn;
private final AnomalyDetector anomalyDetector;
@@ -98,7 +98,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline
{
private final DatasetConfigDTO dataset;
private final DateTimeZone dateTimeZone;
private final Period bucketPeriod;
-
+ private final long cachingPeriodLookback;
public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO
config, long startTime, long endTime) {
super(provider, config, startTime, endTime);
@@ -134,6 +134,8 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
String bucketStr = MapUtils.getString(config.getProperties(),
PROP_BUCKET_PERIOD);
this.bucketPeriod = bucketStr == null ?
this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
+ this.cachingPeriodLookback =
config.getProperties().containsKey(PROP_CACHE_PERIOD_LOOKBACK) ?
+ MapUtils.getLong(config.getProperties(), PROP_CACHE_PERIOD_LOOKBACK) :
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
}
@Override
@@ -142,8 +144,7 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
// 1. get the last time stamp for the time series.
// 2. to calculate current values and baseline values for the anomalies
detected
// 3. anomaly detection current and baseline time series value
- long cachingPeriodLookback =
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
- if (cachingPeriodLookback > 0) {
+ if( this.cachingPeriodLookback >= 0) {
MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(),
startTime - cachingPeriodLookback, endTime,
this.metricEntity.getFilters());
this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
@@ -178,7 +179,7 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
// guess-timate next time stamp
// there are two cases. If the data is complete, next detection starts from
the end time of this detection
// If data is incomplete, next detection starts from the latest available
data's time stamp plus the one time granularity.
- long getLastTimeStamp(){
+ long getLastTimeStamp() {
long end = this.endTime;
if (this.dataset != null) {
MetricSlice metricSlice = MetricSlice.from(this.metricEntity.getId(),
@@ -216,9 +217,12 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
LOG.info("Will run detection in window {}", window);
}
// pre cache the time series for the whole detection time period
instead of fetching for each window
- long cachingPeriodLookback =
getCachingPeriodLookback(this.dataset.bucketTimeGranularity());
- MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(),
startTime - cachingPeriodLookback, endTime, this.metricEntity.getFilters(),
toTimeGranularity(this.bucketPeriod));
- this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
+ if (this.cachingPeriodLookback >= 0) {
+ MetricSlice cacheSlice =
+ MetricSlice.from(this.metricEntity.getId(), startTime -
cachingPeriodLookback, endTime,
+ this.metricEntity.getFilters(),
toTimeGranularity(this.bucketPeriod));
+ this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
+ }
return monitoringWindows;
} catch (Exception e) {
LOG.info("can't generate moving monitoring windows, calling with
single detection window", e);
@@ -327,5 +331,4 @@ public class AnomalyDetectorWrapper extends
DetectionPipeline {
return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
}
}
-
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 94b3443..9124da3 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -164,6 +164,7 @@ public class CompositePipelineConfigTranslator extends
YamlDetectionConfigTransl
private static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE =
"RULE_BASELINE";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
private static final String PROP_MAX_DURATION = "maxDuration";
+ private static final String PROP_CACHE_PERIOD_LOOKBACK =
"cachingPeriodLookback";
private static final DetectionRegistry DETECTION_REGISTRY =
DetectionRegistry.getInstance();
static {
@@ -266,7 +267,7 @@ public class CompositePipelineConfigTranslator extends
YamlDetectionConfigTransl
nestedProperties.put(PROP_CLASS_NAME,
AnomalyDetectorWrapper.class.getName());
String detectorKey = makeComponentKey(detectorType, name);
- fillInWindowSizeAndUnit(nestedProperties, yamlConfig, detectorType);
+ fillInDetectorWrapperProperties(nestedProperties, yamlConfig,
detectorType);
buildComponentSpec(yamlConfig, detectorType, detectorKey);
@@ -286,7 +287,7 @@ public class CompositePipelineConfigTranslator extends
YamlDetectionConfigTransl
}
// fill in window size and unit if detector requires this
- private void fillInWindowSizeAndUnit(Map<String, Object> properties,
Map<String, Object> yamlConfig, String detectorType) {
+ private void fillInDetectorWrapperProperties(Map<String, Object> properties,
Map<String, Object> yamlConfig, String detectorType) {
if (MOVING_WINDOW_DETECTOR_TYPES.contains(detectorType)) {
properties.put(PROP_MOVING_WINDOW_DETECTION, true);
switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
@@ -329,6 +330,9 @@ public class CompositePipelineConfigTranslator extends
YamlDetectionConfigTransl
if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig,
PROP_BUCKET_PERIOD));
}
+ if (yamlConfig.containsKey(PROP_CACHE_PERIOD_LOOKBACK)) {
+ properties.put(PROP_CACHE_PERIOD_LOOKBACK,
MapUtils.getString(yamlConfig, PROP_CACHE_PERIOD_LOOKBACK));
+ }
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
index 6f29fd3..9acc924 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockBaselineProvider.java
@@ -18,7 +18,6 @@ package org.apache.pinot.thirdeye.detection.components;
import org.apache.pinot.thirdeye.dataframe.Series;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
import org.apache.pinot.thirdeye.detection.InputDataFetcher;
import org.apache.pinot.thirdeye.detection.spec.MockBaselineProviderSpec;
import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
@@ -40,6 +39,6 @@ public class MockBaselineProvider implements
BaselineProvider<MockBaselineProvid
@Override
public Double computePredictedAggregates(MetricSlice slice,
Series.DoubleFunction aggregateFunction) {
- return this.mockSpec.getAggregates().get(slice);
+ return this.mockSpec.getBaselineAggregates().get(slice);
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
index 523b861..0b3917f 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
@@ -49,7 +49,7 @@ public class RuleBaselineProviderTest {
MetricSlice slice2Wow = MetricSlice.from(1L, 1537920000000L,
1538006400000L);
Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
aggregates.put(slice1Wow, DataFrame.builder(COL_TIME + ":LONG", COL_VALUE
+ ":DOUBLE")
- .append(-1, 100)
+ .append(-1, 150)
.build()
.setIndex(COL_TIME));
aggregates.put(slice2Wow, DataFrame.builder(COL_TIME + ":LONG", COL_VALUE
+ ":DOUBLE")
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
index 49f8a09..db100d2 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockBaselineProviderSpec.java
@@ -23,7 +23,7 @@ import java.util.Map;
public class MockBaselineProviderSpec extends AbstractSpec {
private Map<MetricSlice, TimeSeries> baselineTimeseries;
- private Map<MetricSlice, Double> aggregates;
+ private Map<MetricSlice, Double> baselineAggregates;
public Map<MetricSlice, TimeSeries> getBaselineTimeseries() {
return baselineTimeseries;
@@ -33,11 +33,11 @@ public class MockBaselineProviderSpec extends AbstractSpec {
this.baselineTimeseries = baselineTimeseries;
}
- public Map<MetricSlice, Double> getAggregates() {
- return aggregates;
+ public Map<MetricSlice, Double> getBaselineAggregates() {
+ return baselineAggregates;
}
- public void setAggregates(Map<MetricSlice, Double> aggregates) {
- this.aggregates = aggregates;
+ public void setBaselineAggregates(Map<MetricSlice, Double>
baselineAggregates) {
+ this.baselineAggregates = baselineAggregates;
}
}
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 77d1785..0b4c0d8 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -111,7 +111,9 @@ public class BaselineFillingMergeWrapperTest {
Collections.singletonList(new
MockPipelineOutput(Collections.singletonList(anomaly), -1L))))
.setAnomalies(Collections.emptyList())
.setMetrics(Collections.singletonList(metric))
- .setTimeseries(timeseries);
+ .setTimeseries(timeseries)
+ .setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600),
+ DataFrame.builder(COL_TIME + ":LONG", COL_VALUE +
":DOUBLE").append(3000, 100).build()));
// set up detection config properties
this.config.getProperties().put(PROP_MAX_GAP, 100);
@@ -121,7 +123,7 @@ public class BaselineFillingMergeWrapperTest {
// initialize the baseline provider
BaselineProvider baselineProvider = new MockBaselineProvider();
MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
- spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600),
100.0));
+ spec.setBaselineAggregates(ImmutableMap.of(MetricSlice.from(1, 3000,
3600), 100.0));
spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000,
3600), TimeSeries.fromDataFrame(
DataFrame.builder(COL_TIME + ":LONG", COL_VALUE +
":DOUBLE").append(3000, 100).build())));
InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider,
this.config.getId());
@@ -161,7 +163,7 @@ public class BaselineFillingMergeWrapperTest {
this.config.getProperties().put("detector", "$testDetector");
BaselineProvider baselineProvider = new MockBaselineProvider();
MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
- spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600),
100.0));
+ spec.setBaselineAggregates(ImmutableMap.of(MetricSlice.from(1, 3000,
3600), 100.0));
spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000,
3600), TimeSeries.fromDataFrame(
DataFrame.builder(COL_TIME + ":LONG", COL_VALUE +
":DOUBLE").append(3000, 100).build())));
InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider,
this.config.getId());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]