tangdian commented on a change in pull request #4067: [TE] Holt Winters detector
URL: https://github.com/apache/incubator-pinot/pull/4067#discussion_r273645935
##########
File path:
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
##########
@@ -0,0 +1,538 @@
+package org.apache.pinot.thirdeye.detection.components;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.math3.analysis.MultivariateFunction;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.LongSeries;
+import org.apache.pinot.thirdeye.dataframe.Series;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.Pattern;
+import org.apache.pinot.thirdeye.detection.algorithm.AlgorithmUtils;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.Tune;
+import org.apache.pinot.thirdeye.detection.spec.HoltWintersDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.components.Tunable;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.apache.commons.math3.optim.MaxIter;
+import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
+import org.apache.commons.math3.optim.MaxEval;
+import org.apache.commons.math3.optim.SimpleBounds;
+import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
+import org.apache.commons.math3.optim.InitialGuess;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+@Components(title = "Holt Winters triple exponential smoothing forecasting and
detection",
+ type = "HOLT_WINTERS_RULE",
+ tags = {DetectionTag.RULE_DETECTION},
+ description = "Forecast with holt winters triple exponential smoothing and
generate anomalies",
+ params = {
+ @Param(name = "alpha"),
+ @Param(name = "beta"),
+ @Param(name = "gamma"),
+ @Param(name = "period"),
+ @Param(name = "pattern"),
+ @Param(name = "sensitivity"),
+ @Param(name = "kernelSmoothing")
+ })
+public class HoltWintersDetector implements
BaselineProvider<HoltWintersDetectorSpec>,
+
AnomalyDetector<HoltWintersDetectorSpec> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoltWintersDetector.class);
+ private InputDataFetcher dataFetcher;
+ private static final String COL_CURR = "current";
+ private static final String COL_BASE = "baseline";
+ private static final String COL_ANOMALY = "anomaly";
+ private static final String COL_PATTERN = "pattern";
+ private static final String COL_DIFF = "diff";
+ private static final String COL_DIFF_VIOLATION = "diff_violation";
+ private static final String COL_ERROR = "error";
+ private static final String TRUE = "true";
+ private static final long PARTITION_PERIOD = 604800000L;
+ private static final long KERNEL_PERIOD = 3600000L;
+ private static final int LOOKBACK = 60;
+ private static final Period LOOKBACK_PERIOD =
ConfigUtils.parsePeriod("60DAYS");
+
+ private int period;
+ private double alpha;
+ private double beta;
+ private double gamma;
+ private Pattern pattern;
+ private TimeGranularity timeGranularity;
+ private double sensitivity;
+ private String monitoringGranularity;
+ private boolean kernelSmoothing;
+
+ @Override
+ public void init(HoltWintersDetectorSpec spec, InputDataFetcher dataFetcher)
{
+ this.period = spec.getPeriod();
+ this.alpha = spec.getAlpha();
+ this.beta = spec.getBeta();
+ this.gamma = spec.getGamma();
+ this.dataFetcher = dataFetcher;
+ this.pattern = Pattern.valueOf(spec.getPattern().toUpperCase());
+ this.kernelSmoothing = spec.getKernelSmoothing().equalsIgnoreCase(TRUE);
+
+ this.sensitivity = spec.getSensitivity();
+ this.monitoringGranularity = spec.getMonitoringGranularity();
+ if (this.monitoringGranularity.equals("1_MONTHS")) {
+ this.timeGranularity = MetricSlice.NATIVE_GRANULARITY;
+ } else {
+ this.timeGranularity =
TimeGranularity.fromString(spec.getMonitoringGranularity());
+ }
+ }
+
+ @Override
+ public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+ MetricEntity metricEntity = MetricEntity.fromSlice(slice, 0);
+ Interval window = new Interval(slice.getStart(), slice.getEnd());
+ DateTime trainStart = new
DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+ DataFrame inputDf = fetchDataByPart(metricEntity, trainStart.getMillis(),
window.getEndMillis());
+ DataFrame resultDF = computePredictionInterval(inputDf,
+ window.getStartMillis());
+
+ // Exclude the end
+ if (resultDF.size() > 1) {
+ resultDF = resultDF.head(resultDF.size() - 1);
+ }
+
+ return TimeSeries.fromDataFrame(resultDF);
+ }
+
+ @Override
+ public List<MergedAnomalyResultDTO> runDetection(Interval window, String
metricUrn) {
+ MetricEntity metricEntity = MetricEntity.fromURN(metricUrn);
+ DateTime trainStart = new
DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+ DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new
InputDataSpec()
+
.withMetricIdsForDataset(Collections.singleton(metricEntity.getId()))).getDatasetForMetricId()
+ .get(metricEntity.getId());
+
+ MetricSlice sliceData = MetricSlice.from(metricEntity.getId(),
trainStart.getMillis(), window.getEndMillis(),
+ metricEntity.getFilters(), timeGranularity);
+
+ DataFrame dfInput = fetchDataByPart(metricEntity, trainStart.getMillis(),
window.getEndMillis());
+
+ // Kernel smoothing
+ if (kernelSmoothing &&
!TimeUnit.DAYS.equals(datasetConfig.bucketTimeGranularity().getUnit())) {
+ int kernelSize = (int) (KERNEL_PERIOD /
datasetConfig.bucketTimeGranularity().toMillis());
+ // kernel smoothing
+ if (kernelSize > 1) {
+ final int kernelOffset = kernelSize / 2;
+ double[] values = dfInput.getDoubles(COL_VALUE).values();
+ for (int i = 0; i < values.length - kernelSize + 1; i++) {
+ values[i + kernelOffset] =
AlgorithmUtils.robustMean(dfInput.getDoubles(COL_VALUE)
+ .slice(i, i + kernelSize), kernelSize).getDouble(kernelSize - 1);
+ }
+ dfInput.addSeries(COL_VALUE, values);
+ }
+ }
+
+ DataFrame dfCurr = new DataFrame(dfInput).renameSeries(COL_VALUE,
COL_CURR);
+ DataFrame dfBase = computePredictionInterval(dfInput,
window.getStartMillis()).renameSeries(COL_VALUE, COL_BASE);
+ DataFrame df = new DataFrame(dfCurr).addSeries(dfBase);
+ df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_BASE)));
+ df.addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false));
+
+ // consistent with pattern
+ if (pattern.equals(Pattern.UP_OR_DOWN) ) {
+ df.addSeries(COL_PATTERN, BooleanSeries.fillValues(df.size(), true));
+ } else {
+ df.addSeries(COL_PATTERN, pattern.equals(Pattern.UP) ?
df.getDoubles(COL_DIFF).gt(0) :
+ df.getDoubles(COL_DIFF).lt(0));
+ }
+ df.addSeries(COL_DIFF_VIOLATION,
df.getDoubles(COL_DIFF).abs().gte(df.getDoubles(COL_ERROR)));
+ df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN,
COL_DIFF_VIOLATION);
+
+ // anomalies
+ List<MergedAnomalyResultDTO> result =
DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY,
+ window.getEndMillis(),
+ DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity,
datasetConfig), datasetConfig);
+
+ return result;
+ }
+
+ /**
+ * Break a time period into parts and fetch it one by one, and put it
together into a DataFrame
+ *
+ * @param metricEntity metric entity
+ * @param start start timestamp
+ * @param end end timestamp
+ * @return Data Frame that has data from start to end
+ */
+ private DataFrame fetchDataByPart(MetricEntity metricEntity, long start,
long end) {
+ long duration = end - start;
+ List<MetricSlice> slices = new ArrayList<>();
+ long temp = start;
+ int parts = (int) (duration / PARTITION_PERIOD);
+ for (int i = 0; i < parts; i++) {
+ MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp,
temp + PARTITION_PERIOD,
+ metricEntity.getFilters(), timeGranularity);
+ temp += PARTITION_PERIOD;
+ slices.add(sliceData);
+ }
+ MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, end,
+ metricEntity.getFilters(), timeGranularity);
+ slices.add(sliceData);
+
+ InputData data = this.dataFetcher.fetchData(new
InputDataSpec().withTimeseriesSlices(slices)
+
.withMetricIdsForDataset(Collections.singletonList(metricEntity.getId())));
+
+ LongSeries longSeries = LongSeries.buildFrom();
+ DoubleSeries doubleSeries = DoubleSeries.buildFrom();
+
+ DataFrame df = new DataFrame();
+ df.addSeries(COL_TIME, longSeries);
+ df.addSeries(COL_VALUE, doubleSeries);
+ df.setIndex(COL_TIME);
+
+ Set<Map.Entry<MetricSlice, DataFrame>> set =
data.getTimeseries().entrySet();
+ for (Map.Entry<MetricSlice, DataFrame> entry: set) {
+ df = df.append(entry.getValue());
+ }
+ df = df.sortedBy(COL_TIME);
+
+ // Remove duplicates
+ long prev = -1;
+ for (int i = 0; i < df.size(); i++) {
+ long curr = df.getLongs(COL_TIME).get(i);
+ if (curr == prev) {
+ DataFrame df1 = df.slice(0, i);
+ DataFrame df2 = df.slice(i+1, df.size());
+ df = df1.append(df2);
+ }
+ prev = curr;
+ }
+ return df;
+ }
+
+ /**
+ * Returns a data frame containing the same time daily data
+ * @param originalDF the original dataframe
+ * @param time the time of day
+ * @return dataframe containing same time of daily data for LOOKBACK number
of days
+ */
+ private DataFrame getDailyDF(DataFrame originalDF, Long time) {
+ LongSeries longSeries = (LongSeries) originalDF.get(COL_TIME);
+ Long start = longSeries.getLong(0);
+ DateTime dt = new DateTime(time);
+ DataFrame df = DataFrame.builder(COL_TIME, COL_VALUE).build();
+
+ for (int i = 0; i < LOOKBACK; i++) {
+ DateTime subDt = new DateTime(dt);
+ subDt = subDt.minusDays(1);
+ long t = subDt.getMillis();
Review comment:
I need to do subDt = subDt.minusDays(1); because I will need subDt to be
changed for the code for getting week over week data below.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]