tangdian commented on a change in pull request #4067: [TE] Holt Winters detector
URL: https://github.com/apache/incubator-pinot/pull/4067#discussion_r273631601
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
 ##########
 @@ -0,0 +1,538 @@
+package org.apache.pinot.thirdeye.detection.components;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.math3.analysis.MultivariateFunction;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.LongSeries;
+import org.apache.pinot.thirdeye.dataframe.Series;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.Pattern;
+import org.apache.pinot.thirdeye.detection.algorithm.AlgorithmUtils;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.Tune;
+import org.apache.pinot.thirdeye.detection.spec.HoltWintersDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.components.Tunable;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.apache.commons.math3.optim.MaxIter;
+import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
+import org.apache.commons.math3.optim.MaxEval;
+import org.apache.commons.math3.optim.SimpleBounds;
+import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
+import org.apache.commons.math3.optim.InitialGuess;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+@Components(title = "Holt Winters triple exponential smoothing forecasting and 
detection",
+    type = "HOLT_WINTERS_RULE",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Forecast with holt winters triple exponential smoothing and 
generate anomalies",
+    params = {
+        @Param(name = "alpha"),
+        @Param(name = "beta"),
+        @Param(name = "gamma"),
+        @Param(name = "period"),
+        @Param(name = "pattern"),
+        @Param(name = "sensitivity"),
+        @Param(name = "kernelSmoothing")
+    })
+public class HoltWintersDetector implements 
BaselineProvider<HoltWintersDetectorSpec>,
+                                            
AnomalyDetector<HoltWintersDetectorSpec> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoltWintersDetector.class);
+  private InputDataFetcher dataFetcher;
+  private static final String COL_CURR = "current";
+  private static final String COL_BASE = "baseline";
+  private static final String COL_ANOMALY = "anomaly";
+  private static final String COL_PATTERN = "pattern";
+  private static final String COL_DIFF = "diff";
+  private static final String COL_DIFF_VIOLATION = "diff_violation";
+  private static final String COL_ERROR = "error";
+  private static final String TRUE = "true";
+  private static final long PARTITION_PERIOD = 604800000L;
+  private static final long KERNEL_PERIOD = 3600000L;
+  private static final int LOOKBACK = 60;
+  private static final Period LOOKBACK_PERIOD = 
ConfigUtils.parsePeriod("60DAYS");
+
+  private int period;
+  private double alpha;
+  private double beta;
+  private double gamma;
+  private Pattern pattern;
+  private TimeGranularity timeGranularity;
+  private double sensitivity;
+  private String monitoringGranularity;
+  private boolean kernelSmoothing;
+
+  @Override
+  public void init(HoltWintersDetectorSpec spec, InputDataFetcher dataFetcher) 
{
+    this.period = spec.getPeriod();
+    this.alpha = spec.getAlpha();
+    this.beta = spec.getBeta();
+    this.gamma = spec.getGamma();
+    this.dataFetcher = dataFetcher;
+    this.pattern = Pattern.valueOf(spec.getPattern().toUpperCase());
+    this.kernelSmoothing = spec.getKernelSmoothing().equalsIgnoreCase(TRUE);
+
+    this.sensitivity = spec.getSensitivity();
+    this.monitoringGranularity = spec.getMonitoringGranularity();
+    if (this.monitoringGranularity.equals("1_MONTHS")) {
+      this.timeGranularity = MetricSlice.NATIVE_GRANULARITY;
+    } else {
+      this.timeGranularity = 
TimeGranularity.fromString(spec.getMonitoringGranularity());
+    }
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    MetricEntity metricEntity = MetricEntity.fromSlice(slice, 0);
+    Interval window = new Interval(slice.getStart(), slice.getEnd());
+    DateTime trainStart = new 
DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+    DataFrame inputDf = fetchDataByPart(metricEntity, trainStart.getMillis(), 
window.getEndMillis());
+    DataFrame resultDF = computePredictionInterval(inputDf,
+        window.getStartMillis());
+
+    // Exclude the end
+    if (resultDF.size() > 1) {
+      resultDF = resultDF.head(resultDF.size() - 1);
+    }
+
+    return TimeSeries.fromDataFrame(resultDF);
+  }
+
+  @Override
+  public List<MergedAnomalyResultDTO> runDetection(Interval window, String 
metricUrn) {
+    MetricEntity metricEntity = MetricEntity.fromURN(metricUrn);
+    DateTime trainStart = new 
DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+    DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new 
InputDataSpec()
+        
.withMetricIdsForDataset(Collections.singleton(metricEntity.getId()))).getDatasetForMetricId()
+        .get(metricEntity.getId());
+
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), 
trainStart.getMillis(), window.getEndMillis(),
+        metricEntity.getFilters(), timeGranularity);
+
+    DataFrame dfInput = fetchDataByPart(metricEntity, trainStart.getMillis(), 
window.getEndMillis());
+
+    // Kernel smoothing
+    if (kernelSmoothing && 
!TimeUnit.DAYS.equals(datasetConfig.bucketTimeGranularity().getUnit())) {
+      int kernelSize = (int) (KERNEL_PERIOD / 
datasetConfig.bucketTimeGranularity().toMillis());
+      // kernel smoothing
+      if (kernelSize > 1) {
+        final int kernelOffset = kernelSize / 2;
+        double[] values = dfInput.getDoubles(COL_VALUE).values();
+        for (int i = 0; i < values.length - kernelSize + 1; i++) {
+          values[i + kernelOffset] = 
AlgorithmUtils.robustMean(dfInput.getDoubles(COL_VALUE)
+              .slice(i, i + kernelSize), kernelSize).getDouble(kernelSize - 1);
+        }
+        dfInput.addSeries(COL_VALUE, values);
+      }
+    }
+
+    DataFrame dfCurr = new DataFrame(dfInput).renameSeries(COL_VALUE, 
COL_CURR);
+    DataFrame dfBase = computePredictionInterval(dfInput, 
window.getStartMillis()).renameSeries(COL_VALUE, COL_BASE);
+    DataFrame df = new DataFrame(dfCurr).addSeries(dfBase);
+    df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_BASE)));
+    df.addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false));
+
+    // consistent with pattern
+    if (pattern.equals(Pattern.UP_OR_DOWN) ) {
+      df.addSeries(COL_PATTERN, BooleanSeries.fillValues(df.size(), true));
+    } else {
+      df.addSeries(COL_PATTERN, pattern.equals(Pattern.UP) ? 
df.getDoubles(COL_DIFF).gt(0) :
+          df.getDoubles(COL_DIFF).lt(0));
+    }
+    df.addSeries(COL_DIFF_VIOLATION, 
df.getDoubles(COL_DIFF).abs().gte(df.getDoubles(COL_ERROR)));
+    df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN, 
COL_DIFF_VIOLATION);
+
+    // anomalies
+    List<MergedAnomalyResultDTO> result = 
DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY,
+        window.getEndMillis(),
+        DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, 
datasetConfig), datasetConfig);
+
+    return result;
+  }
+
+  /**
+   * Break a time period into parts and fetch it one by one, and put it 
together into a DataFrame
+   *
+   * @param metricEntity metric entity
+   * @param start start timestamp
+   * @param end end timestamp
+   * @return Data Frame that has data from start to end
+   */
+  private DataFrame fetchDataByPart(MetricEntity metricEntity, long start, 
long end) {
+    long duration = end - start;
+    List<MetricSlice> slices = new ArrayList<>();
+    long temp = start;
+    int parts = (int) (duration / PARTITION_PERIOD);
+    for (int i = 0; i < parts; i++) {
+      MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, 
temp + PARTITION_PERIOD,
+          metricEntity.getFilters(), timeGranularity);
+      temp += PARTITION_PERIOD;
+      slices.add(sliceData);
+    }
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, end,
+        metricEntity.getFilters(), timeGranularity);
+    slices.add(sliceData);
+
+    InputData data = this.dataFetcher.fetchData(new 
InputDataSpec().withTimeseriesSlices(slices)
+        
.withMetricIdsForDataset(Collections.singletonList(metricEntity.getId())));
+
+    LongSeries longSeries = LongSeries.buildFrom();
+    DoubleSeries doubleSeries = DoubleSeries.buildFrom();
+
+    DataFrame df = new DataFrame();
+    df.addSeries(COL_TIME, longSeries);
+    df.addSeries(COL_VALUE, doubleSeries);
+    df.setIndex(COL_TIME);
+
+    Set<Map.Entry<MetricSlice, DataFrame>> set = 
data.getTimeseries().entrySet();
+    for (Map.Entry<MetricSlice, DataFrame> entry: set) {
+      df = df.append(entry.getValue());
+    }
+    df = df.sortedBy(COL_TIME);
+
+    // Remove duplicates
+    long prev = -1;
+    for (int i = 0; i < df.size(); i++) {
+      long curr = df.getLongs(COL_TIME).get(i);
+      if (curr == prev) {
+        DataFrame df1 = df.slice(0, i);
+        DataFrame df2 = df.slice(i+1, df.size());
+        df = df1.append(df2);
+      }
+      prev = curr;
+    }
+    return df;
+  }
+
+  /**
+   * Returns a data frame containing the same time daily data
+   * @param originalDF the original dataframe
+   * @param time the time of day
+   * @return dataframe containing same time of daily data for LOOKBACK number 
of days
+   */
+  private DataFrame getDailyDF(DataFrame originalDF, Long time) {
+    LongSeries longSeries = (LongSeries) originalDF.get(COL_TIME);
+    Long start = longSeries.getLong(0);
+    DateTime dt = new DateTime(time);
+    DataFrame df = DataFrame.builder(COL_TIME, COL_VALUE).build();
+
+    for (int i = 0; i < LOOKBACK; i++) {
+      DateTime subDt = new DateTime(dt);
+      subDt = subDt.minusDays(1);
+      long t = subDt.getMillis();
 
 Review comment:
   I had to do subDt = subDt.minusDays(1), because subDt.minusDays(1) does not 
mutate subDt

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to