tangdian commented on a change in pull request #4067: [TE] Holt Winters detector
URL: https://github.com/apache/incubator-pinot/pull/4067#discussion_r273635906
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
 ##########
 @@ -0,0 +1,538 @@
+package org.apache.pinot.thirdeye.detection.components;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.math3.analysis.MultivariateFunction;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.LongSeries;
+import org.apache.pinot.thirdeye.dataframe.Series;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.Pattern;
+import org.apache.pinot.thirdeye.detection.algorithm.AlgorithmUtils;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.annotation.Tune;
+import org.apache.pinot.thirdeye.detection.spec.HoltWintersDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.components.Tunable;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.apache.commons.math3.optim.MaxIter;
+import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
+import org.apache.commons.math3.optim.MaxEval;
+import org.apache.commons.math3.optim.SimpleBounds;
+import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
+import org.apache.commons.math3.optim.InitialGuess;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
+@Components(title = "Holt Winters triple exponential smoothing forecasting and 
detection",
+    type = "HOLT_WINTERS_RULE",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Forecast with holt winters triple exponential smoothing and 
generate anomalies",
+    params = {
+        @Param(name = "alpha"),
+        @Param(name = "beta"),
+        @Param(name = "gamma"),
+        @Param(name = "period"),
+        @Param(name = "pattern"),
+        @Param(name = "sensitivity"),
+        @Param(name = "kernelSmoothing")
+    })
+public class HoltWintersDetector implements 
BaselineProvider<HoltWintersDetectorSpec>,
+                                            
AnomalyDetector<HoltWintersDetectorSpec> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoltWintersDetector.class);
+  private InputDataFetcher dataFetcher;
+  private static final String COL_CURR = "current";
+  private static final String COL_BASE = "baseline";
+  private static final String COL_ANOMALY = "anomaly";
+  private static final String COL_PATTERN = "pattern";
+  private static final String COL_DIFF = "diff";
+  private static final String COL_DIFF_VIOLATION = "diff_violation";
+  private static final String COL_ERROR = "error";
+  private static final String TRUE = "true";
+  private static final long PARTITION_PERIOD = 604800000L;
+  private static final long KERNEL_PERIOD = 3600000L;
+  private static final int LOOKBACK = 60;
+  private static final Period LOOKBACK_PERIOD = 
ConfigUtils.parsePeriod("60DAYS");
+
+  private int period;
+  private double alpha;
+  private double beta;
+  private double gamma;
+  private Pattern pattern;
+  private TimeGranularity timeGranularity;
+  private double sensitivity;
+  private String monitoringGranularity;
+  private boolean kernelSmoothing;
+
+  @Override
+  public void init(HoltWintersDetectorSpec spec, InputDataFetcher dataFetcher) 
{
+    this.period = spec.getPeriod();
+    this.alpha = spec.getAlpha();
+    this.beta = spec.getBeta();
+    this.gamma = spec.getGamma();
+    this.dataFetcher = dataFetcher;
+    this.pattern = Pattern.valueOf(spec.getPattern().toUpperCase());
+    this.kernelSmoothing = spec.getKernelSmoothing().equalsIgnoreCase(TRUE);
+
+    this.sensitivity = spec.getSensitivity();
+    this.monitoringGranularity = spec.getMonitoringGranularity();
+    if (this.monitoringGranularity.equals("1_MONTHS")) {
+      this.timeGranularity = MetricSlice.NATIVE_GRANULARITY;
+    } else {
+      this.timeGranularity = 
TimeGranularity.fromString(spec.getMonitoringGranularity());
+    }
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    MetricEntity metricEntity = MetricEntity.fromSlice(slice, 0);
+    Interval window = new Interval(slice.getStart(), slice.getEnd());
+    DateTime trainStart = new 
DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+    DataFrame inputDf = fetchDataByPart(metricEntity, trainStart.getMillis(), 
window.getEndMillis());
+    DataFrame resultDF = computePredictionInterval(inputDf,
+        window.getStartMillis());
+
+    // Exclude the end
+    if (resultDF.size() > 1) {
+      resultDF = resultDF.head(resultDF.size() - 1);
+    }
+
+    return TimeSeries.fromDataFrame(resultDF);
+  }
+
+  @Override
+  public List<MergedAnomalyResultDTO> runDetection(Interval window, String 
metricUrn) {
+    MetricEntity metricEntity = MetricEntity.fromURN(metricUrn);
+    DateTime trainStart = new 
DateTime(window.getStart()).minus(LOOKBACK_PERIOD);
+    DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new 
InputDataSpec()
+        
.withMetricIdsForDataset(Collections.singleton(metricEntity.getId()))).getDatasetForMetricId()
+        .get(metricEntity.getId());
+
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), 
trainStart.getMillis(), window.getEndMillis(),
+        metricEntity.getFilters(), timeGranularity);
+
+    DataFrame dfInput = fetchDataByPart(metricEntity, trainStart.getMillis(), 
window.getEndMillis());
+
+    // Kernel smoothing
+    if (kernelSmoothing && 
!TimeUnit.DAYS.equals(datasetConfig.bucketTimeGranularity().getUnit())) {
+      int kernelSize = (int) (KERNEL_PERIOD / 
datasetConfig.bucketTimeGranularity().toMillis());
+      // kernel smoothing
+      if (kernelSize > 1) {
+        final int kernelOffset = kernelSize / 2;
+        double[] values = dfInput.getDoubles(COL_VALUE).values();
+        for (int i = 0; i < values.length - kernelSize + 1; i++) {
+          values[i + kernelOffset] = 
AlgorithmUtils.robustMean(dfInput.getDoubles(COL_VALUE)
+              .slice(i, i + kernelSize), kernelSize).getDouble(kernelSize - 1);
+        }
+        dfInput.addSeries(COL_VALUE, values);
+      }
+    }
+
+    DataFrame dfCurr = new DataFrame(dfInput).renameSeries(COL_VALUE, 
COL_CURR);
+    DataFrame dfBase = computePredictionInterval(dfInput, 
window.getStartMillis()).renameSeries(COL_VALUE, COL_BASE);
+    DataFrame df = new DataFrame(dfCurr).addSeries(dfBase);
+    df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_BASE)));
+    df.addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false));
+
+    // consistent with pattern
+    if (pattern.equals(Pattern.UP_OR_DOWN) ) {
+      df.addSeries(COL_PATTERN, BooleanSeries.fillValues(df.size(), true));
+    } else {
+      df.addSeries(COL_PATTERN, pattern.equals(Pattern.UP) ? 
df.getDoubles(COL_DIFF).gt(0) :
+          df.getDoubles(COL_DIFF).lt(0));
+    }
+    df.addSeries(COL_DIFF_VIOLATION, 
df.getDoubles(COL_DIFF).abs().gte(df.getDoubles(COL_ERROR)));
+    df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN, 
COL_DIFF_VIOLATION);
+
+    // anomalies
+    List<MergedAnomalyResultDTO> result = 
DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY,
+        window.getEndMillis(),
+        DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, 
datasetConfig), datasetConfig);
+
+    return result;
+  }
+
+  /**
+   * Break a time period into parts and fetch it one by one, and put it 
together into a DataFrame
+   *
+   * @param metricEntity metric entity
+   * @param start start timestamp
+   * @param end end timestamp
+   * @return Data Frame that has data from start to end
+   */
+  private DataFrame fetchDataByPart(MetricEntity metricEntity, long start, 
long end) {
+    long duration = end - start;
+    List<MetricSlice> slices = new ArrayList<>();
+    long temp = start;
+    int parts = (int) (duration / PARTITION_PERIOD);
+    for (int i = 0; i < parts; i++) {
+      MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, 
temp + PARTITION_PERIOD,
+          metricEntity.getFilters(), timeGranularity);
+      temp += PARTITION_PERIOD;
+      slices.add(sliceData);
+    }
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), temp, end,
+        metricEntity.getFilters(), timeGranularity);
+    slices.add(sliceData);
+
+    InputData data = this.dataFetcher.fetchData(new 
InputDataSpec().withTimeseriesSlices(slices)
+        
.withMetricIdsForDataset(Collections.singletonList(metricEntity.getId())));
+
+    LongSeries longSeries = LongSeries.buildFrom();
+    DoubleSeries doubleSeries = DoubleSeries.buildFrom();
+
+    DataFrame df = new DataFrame();
+    df.addSeries(COL_TIME, longSeries);
+    df.addSeries(COL_VALUE, doubleSeries);
+    df.setIndex(COL_TIME);
+
+    Set<Map.Entry<MetricSlice, DataFrame>> set = 
data.getTimeseries().entrySet();
+    for (Map.Entry<MetricSlice, DataFrame> entry: set) {
+      df = df.append(entry.getValue());
+    }
+    df = df.sortedBy(COL_TIME);
+
+    // Remove duplicates
+    long prev = -1;
+    for (int i = 0; i < df.size(); i++) {
+      long curr = df.getLongs(COL_TIME).get(i);
+      if (curr == prev) {
+        DataFrame df1 = df.slice(0, i);
+        DataFrame df2 = df.slice(i+1, df.size());
+        df = df1.append(df2);
+      }
+      prev = curr;
+    }
+    return df;
+  }
+
+  /**
+   * Returns a data frame containing the same time daily data
+   * @param originalDF the original dataframe
+   * @param time the time of day
+   * @return dataframe containing same time of daily data for LOOKBACK number 
of days
+   */
+  private DataFrame getDailyDF(DataFrame originalDF, Long time) {
+    LongSeries longSeries = (LongSeries) originalDF.get(COL_TIME);
+    Long start = longSeries.getLong(0);
+    DateTime dt = new DateTime(time);
+    DataFrame df = DataFrame.builder(COL_TIME, COL_VALUE).build();
+
+    for (int i = 0; i < LOOKBACK; i++) {
+      DateTime subDt = new DateTime(dt);
+      subDt = subDt.minusDays(1);
+      long t = subDt.getMillis();
+
+      if (t < start) break;
+
+      int index = longSeries.find(t);
+      int backtrackCounter = 0;
+      if (index != -1) {
+        df = df.append(originalDF.slice(index, index+1));
+      } else { // If the 1 day look back data doesn't exist, use wow data
+        while (index == -1) {
+          subDt = subDt.minusDays(period);
+          long tWO1W = subDt.getMillis();
+          index = longSeries.find(tWO1W);
+          if (index != -1) {
+            df = df.append(originalDF.slice(index, index+1));
+          }
+          if (backtrackCounter++ > 4) { // search up to 4 weeks, else plugin 
the last value
+            double lastVal = 
(originalDF.get(COL_VALUE)).getDouble(longSeries.find(time));
+            DateTime lastDt = dt.minusDays(1);
+            DataFrame append = DataFrame.builder(COL_TIME, 
COL_VALUE).append(lastDt, lastVal).build();
+            df = df.append(append);
+            break;
+          }
+        }
+      }
+      dt = dt.minusDays(1);
+    }
+    df = df.reverse();
+    return df;
+  }
+
+  private static double calculateInitialLevel(double[] y) {
+    return y[0];
+  }
+
+  /**
+   * See: http://www.itl.nist.gov/div898/handbook/pmc/section4/pmc435.htm
+   *
+   * @return - Initial trend - Bt[1]
+   */
+  private static double calculateInitialTrend(double[] y, int period) {
+    double sum = 0;
+
+    for (int i = 0; i < period; i++) {
+      sum += (y[period + i] - y[i]);
+    }
+
+    return sum / (period * period);
+  }
+
+  /**
+   * See: http://www.itl.nist.gov/div898/handbook/pmc/section4/pmc435.htm
+   *
+   * @return - Seasonal Indices.
+   */
+  private static double[] calculateSeasonalIndices(double[] y, int period,
+      int seasons) {
+
+    double[] seasonalAverage = new double[seasons];
+    double[] seasonalIndices = new double[period];
+
+    double[] averagedObservations = new double[y.length];
+
+    for (int i = 0; i < seasons; i++) {
+      for (int j = 0; j < period; j++) {
+        seasonalAverage[i] += y[(i * period) + j];
+      }
+      seasonalAverage[i] /= period;
+    }
+
+    for (int i = 0; i < seasons; i++) {
+      for (int j = 0; j < period; j++) {
+        averagedObservations[(i * period) + j] = y[(i * period) + j]
+            / seasonalAverage[i];
+      }
+    }
+
+    for (int i = 0; i < period; i++) {
+      for (int j = 0; j < seasons; j++) {
+        seasonalIndices[i] += averagedObservations[(j * period) + i];
+      }
+      seasonalIndices[i] /= seasons;
+    }
+
+    return seasonalIndices;
+  }
+
+  /**
+   * Holt Winters forecasting method
+   *
+   * @param y Timeseries to be forecasted
+   * @param alpha alpha
+   * @param beta beta
+   * @param gamma gamma
+   * @return double[] {y_hat, SSE, error_boundary}
+   */
+  private double[] forecast(double[] y, double alpha, double beta, double 
gamma) {
+    double[] It = new double[y.length+1];
+    double[] Ft = new double[y.length+1];
+
+    double a0 = calculateInitialLevel(y);
+    double b0 = calculateInitialTrend(y, period);
+
+    int seasons = y.length / period;
+    double[] initialSeasonalIndices = calculateSeasonalIndices(y, period,
+        seasons);
+
+    for (int i = 0; i < period; i++) {
+      It[i] = initialSeasonalIndices[i];
+    }
+
+    double s = a0;
+    double t = b0;
+    double predictedValue = 0;
+
+    for (int i = 0; i < y.length; i++) {
+      double sNew;
+      double tNew;
+      Ft[i] = (s + t) * It[i];
+      sNew = alpha * (y[i] / It[i]) + (1 - alpha) * (s + t);
+      tNew = beta * (sNew - s) + (1 - beta) * t;
+      if (i + period <= y.length) {
+        It[i + period] = gamma * (y[i] / (sNew * It[i])) + (1 - gamma) * It[i];
+      }
+      s = sNew;
+      t = tNew;
+      if (i == y.length - 1) {
+        predictedValue = (s+t) * It[i+1];
+      }
+    }
+
+    List<Double> diff = new ArrayList<>();
+    double sse = 0;
+    for (int i = 0; i < y.length; i++) {
+      if (Ft[i] != 0) {
+        sse += Math.pow(y[i] - Ft[i], 2);
+        diff.add(Math.abs(Ft[i] - y[i]));
+      }
+    }
+
+    double error = calculateErrorBound(diff, sensitivityToZscore(sensitivity));
+    return new double[]{predictedValue, sse, error};
+  }
+
+  /**
+   *
+   * @param inputDF training dataframe
+   * @param windowStartTime prediction start time
+   * @return DataFrame with timestamp, baseline, error bound
+   */
+  private DataFrame computePredictionInterval(DataFrame inputDF, long 
windowStartTime) {
+
+    DataFrame resultDF = new DataFrame();
+    DataFrame forecastDF = inputDF.filter(new Series.LongConditional() {
+      @Override
+      public boolean apply(long... values) {
+        return values[0] >= windowStartTime;
+      }
+    }, COL_TIME).dropNull();
+
+    int size = forecastDF.size();
+    double[] resultArray = new double[size];
+    long[] resultTimeArray = new long[size];
+    double[] errorArray = new double[size];
+
+    double lastAlpha = this.alpha;
+    double lastBeta = this.beta;
+    double lastGamma = this.gamma;
+
+    for (int k = 0; k < size; k++) {
+
+      DataFrame dailyDF = getDailyDF(inputDF, forecastDF.getLong(COL_TIME, k));
+
+      // We need at least 2 periods of data
+      if (dailyDF.size() < 2 * period) {
+        continue;
+      }
+
+      resultTimeArray[k] = forecastDF.getLong(COL_TIME, k);
+
+      double[] y = dailyDF.getDoubles(COL_VALUE).values();
+      double[] params;
+      if (alpha < 0 && beta < 0 && gamma < 0) {
+        params = fitModelWithBOBYQA(y, lastAlpha, lastBeta, lastGamma);
+      } else {
+        params = new double[3];
+        params[0] = alpha;
+        params[1] = beta;
+        params[2] = gamma;
+      }
+      lastAlpha = params[0];
+      lastBeta = params[1];
+      lastGamma = params[2];
+
+      double[] result = forecast(y, params[0], params[1], params[2]);
+
+      resultArray[k] = result[0];
+      errorArray[k] = result[2];
+    }
+
+    resultDF.addSeries(COL_TIME, LongSeries.buildFrom(resultTimeArray));
+    resultDF.setIndex(COL_TIME);
+    resultDF.addSeries(COL_VALUE, DoubleSeries.buildFrom(resultArray));
+    resultDF.addSeries(COL_ERROR, DoubleSeries.buildFrom(errorArray));
+    return resultDF;
+  }
+
+  /**
+   * Returns the error bound of given list based on mean, std and given zscore
+   *
+   * @param givenNumbers double list
+   * @param zscore zscore used to multiply by std
+   * @return the error bound
+   */
+  private static double calculateErrorBound(List<Double> givenNumbers, double 
zscore) {
+    // calculate the mean value (= average)
+    double sum = 0.0;
+    for (double num : givenNumbers) {
+      sum += num;
+    }
+    double mean = sum / givenNumbers.size();
+
+    // calculate standard deviation
+    double squaredDifferenceSum = 0.0;
+    for (double num : givenNumbers) {
+      squaredDifferenceSum += (num - mean) * (num - mean);
+    }
+    double variance = squaredDifferenceSum / givenNumbers.size();
+    double standardDeviation = Math.sqrt(variance);
+
+    return zscore * standardDeviation;
+  }
+
+  /**
+   * Fit alpha, beta, gamma by optimizing SSE (Sum of squared errors)
+   *
+   * @param y the data
+   * @param lastAlpha last alpha value
+   * @param lastBeta last beta value
+   * @param lastGamma last gamma value
+   * @return double array containing fitted alpha, beta and gamma
+   */
+  private double[] fitModelWithBOBYQA(double[] y, double lastAlpha, double 
lastBeta, double lastGamma) {
+    BOBYQAOptimizer optimizer = new BOBYQAOptimizer(7);
+    if (lastAlpha < 0) {
+      lastAlpha = 0.1;
+    }
+    if (lastBeta < 0) {
+      lastBeta = 0.01;
+    }
+    if (lastGamma < 0) {
+      lastGamma = 0.001;
+    }
+    InitialGuess initGuess = new InitialGuess(new double[]{lastAlpha, 
lastBeta, lastGamma});;
+    MaxIter maxIter = new MaxIter(30000);
+    MaxEval maxEval  = new MaxEval(30000);
+    GoalType goal = GoalType.MINIMIZE;
+    ObjectiveFunction objectiveFunction = new ObjectiveFunction(new 
MultivariateFunction() {
+      public double value(double[] params) {
+        return forecast(y, params[0], params[1], params[2])[1];
+      }
+    });
+    SimpleBounds bounds = new SimpleBounds(new double[]{0.001, 0.001, 0.001}, 
new double[]{0.999, 0.999, 0.999});;
+
+    double[] params;
+
+    try {
+      PointValuePair optimal = optimizer.optimize(objectiveFunction, goal, 
bounds, initGuess, maxIter, maxEval);
+      params = optimal.getPoint();
+    } catch (Exception e) {
+      LOG.error(e.toString());
+      params = new double[3];
+      params[0] = lastAlpha;
 
 Review comment:
   OK, I have created a HWParams class.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to