tangdian commented on a change in pull request #4067: [TE] Holt Winters detector
URL: https://github.com/apache/incubator-pinot/pull/4067#discussion_r275880161
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
 ##########
 @@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.thirdeye.detection.components;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.math3.analysis.MultivariateFunction;
+import org.apache.commons.math3.optim.PointValuePair;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.LongSeries;
+import org.apache.pinot.thirdeye.dataframe.Series;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
+import org.apache.pinot.thirdeye.detection.Pattern;
+import org.apache.pinot.thirdeye.detection.algorithm.AlgorithmUtils;
+import org.apache.pinot.thirdeye.detection.annotation.Components;
+import org.apache.pinot.thirdeye.detection.annotation.DetectionTag;
+import org.apache.pinot.thirdeye.detection.annotation.Param;
+import org.apache.pinot.thirdeye.detection.spec.HoltWintersDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
+import org.apache.pinot.thirdeye.detection.spi.model.InputData;
+import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.apache.commons.math3.optim.MaxIter;
+import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
+import org.apache.commons.math3.optim.MaxEval;
+import org.apache.commons.math3.optim.SimpleBounds;
+import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
+import org.apache.commons.math3.optim.InitialGuess;
+import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+/**
+ * Holt-Winters forecasting algorithm with multiplicative method
+ * Supports seasonality and trend detection
+ * https://otexts.com/fpp2/holt-winters.html
+ */
+@Components(title = "Holt Winters triple exponential smoothing forecasting and 
detection",
+    type = "HOLT_WINTERS_RULE",
+    tags = {DetectionTag.RULE_DETECTION},
+    description = "Forecast with holt winters triple exponential smoothing and 
generate anomalies",
+    params = {
+        @Param(name = "alpha"),
+        @Param(name = "beta"),
+        @Param(name = "gamma"),
+        @Param(name = "period"),
+        @Param(name = "pattern"),
+        @Param(name = "sensitivity"),
+        @Param(name = "kernelSmoothing")})
+public class HoltWintersDetector implements 
BaselineProvider<HoltWintersDetectorSpec>,
+                                            
AnomalyDetector<HoltWintersDetectorSpec> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoltWintersDetector.class);
+  private InputDataFetcher dataFetcher;
+  private static final String COL_CURR = "current";
+  private static final String COL_BASE = "baseline";
+  private static final String COL_ANOMALY = "anomaly";
+  private static final String COL_PATTERN = "pattern";
+  private static final String COL_DIFF = "diff";
+  private static final String COL_DIFF_VIOLATION = "diff_violation";
+  private static final String COL_ERROR = "error";
+  private static final long KERNEL_PERIOD = 3600000L;
+  private static final int LOOKBACK = 60;
+
+  private int period;
+  private double alpha;
+  private double beta;
+  private double gamma;
+  private Pattern pattern;
+  private TimeGranularity timeGranularity;
+  private double sensitivity;
+  private String monitoringGranularity;
+  private boolean smoothing;
+  private Period lookbackPeriod = ConfigUtils.parsePeriod(LOOKBACK + "DAYS");
+
+  @Override
+  public void init(HoltWintersDetectorSpec spec, InputDataFetcher dataFetcher) 
{
+    this.period = spec.getPeriod();
+    this.alpha = spec.getAlpha();
+    this.beta = spec.getBeta();
+    this.gamma = spec.getGamma();
+    this.dataFetcher = dataFetcher;
+    this.pattern = spec.getPattern();
+    this.smoothing = spec.getSmoothing();
+
+    this.sensitivity = spec.getSensitivity();
+    this.monitoringGranularity = spec.getMonitoringGranularity();
+    if (this.monitoringGranularity.equals("1_MONTHS")) {
+      this.timeGranularity = MetricSlice.NATIVE_GRANULARITY;
+    } else {
+      this.timeGranularity = 
TimeGranularity.fromString(spec.getMonitoringGranularity());
+    }
+  }
+
+  @Override
+  public TimeSeries computePredictedTimeSeries(MetricSlice slice) {
+    MetricEntity metricEntity = MetricEntity.fromSlice(slice, 0);
+    Interval window = new Interval(slice.getStart(), slice.getEnd());
+    DateTime trainStart = window.getStart().minus(lookbackPeriod);
+    DataFrame inputDf = fetchData(metricEntity, trainStart.getMillis(), 
window.getEndMillis());
+    DataFrame resultDF = computePredictionInterval(inputDf, 
window.getStartMillis());
+
+    // Exclude the end because baseline calculation should not contain the end
+    if (resultDF.size() > 1) {
+      resultDF = resultDF.head(resultDF.size() - 1);
+    }
+
+    return TimeSeries.fromDataFrame(resultDF);
+  }
+
+  @Override
+  public List<MergedAnomalyResultDTO> runDetection(Interval window, String 
metricUrn) {
+    MetricEntity metricEntity = MetricEntity.fromURN(metricUrn);
+    DateTime trainStart = window.getStart().minus(lookbackPeriod);
+    DatasetConfigDTO datasetConfig = this.dataFetcher.fetchData(new 
InputDataSpec()
+        
.withMetricIdsForDataset(Collections.singleton(metricEntity.getId()))).getDatasetForMetricId()
+        .get(metricEntity.getId());
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), 
trainStart.getMillis(), window.getEndMillis(),
+        metricEntity.getFilters(), timeGranularity);
+    DataFrame dfInput = fetchData(metricEntity, trainStart.getMillis(), 
window.getEndMillis());
+
+
+    // Kernel smoothing
+    if (smoothing && 
!TimeUnit.DAYS.equals(datasetConfig.bucketTimeGranularity().getUnit())) {
+      int kernelSize = (int) (KERNEL_PERIOD / 
datasetConfig.bucketTimeGranularity().toMillis());
+      if (kernelSize > 1) {
+        int kernelOffset = kernelSize / 2;
+        double[] values = dfInput.getDoubles(COL_VALUE).values();
+        for (int i = 0; i <= values.length - kernelSize; i++) {
+          values[i + kernelOffset] = 
AlgorithmUtils.robustMean(dfInput.getDoubles(COL_VALUE)
+              .slice(i, i + kernelSize), kernelSize).getDouble(kernelSize - 1);
+        }
+        dfInput.addSeries(COL_VALUE, values);
+      }
+    }
+
+    DataFrame dfCurr = new DataFrame(dfInput).renameSeries(COL_VALUE, 
COL_CURR);
+    DataFrame dfBase = computePredictionInterval(dfInput, 
window.getStartMillis()).renameSeries(COL_VALUE, COL_BASE);
+    DataFrame df = new DataFrame(dfCurr).addSeries(dfBase);
+    df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_BASE)));
+    df.addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false));
+
+    // Filter pattern
+    if (pattern.equals(Pattern.UP_OR_DOWN) ) {
+      df.addSeries(COL_PATTERN, BooleanSeries.fillValues(df.size(), true));
+    } else {
+      df.addSeries(COL_PATTERN, pattern.equals(Pattern.UP) ? 
df.getDoubles(COL_DIFF).gt(0) :
+          df.getDoubles(COL_DIFF).lt(0));
+    }
+    df.addSeries(COL_DIFF_VIOLATION, 
df.getDoubles(COL_DIFF).abs().gte(df.getDoubles(COL_ERROR)));
+    df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN, 
COL_DIFF_VIOLATION);
+
+    // Anomalies
+    List<MergedAnomalyResultDTO> results = 
DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY,
+        window.getEndMillis(),
+        DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, 
datasetConfig), datasetConfig);
+
+    return results;
+  }
+
+  /**
+   * Fetch data from metric
+   *
+   * @param metricEntity metric entity
+   * @param start start timestamp
+   * @param end end timestamp
+   * @return Data Frame that has data from start to end
+   */
+  private DataFrame fetchData(MetricEntity metricEntity, long start, long end) 
{
+
+    List<MetricSlice> slices = new ArrayList<>();
+    MetricSlice sliceData = MetricSlice.from(metricEntity.getId(), start, end,
+        metricEntity.getFilters(), timeGranularity);
+    slices.add(sliceData);
+    LOG.info("Getting data for" + sliceData.toString());
+    InputData data = this.dataFetcher.fetchData(new 
InputDataSpec().withTimeseriesSlices(slices)
+        
.withMetricIdsForDataset(Collections.singletonList(metricEntity.getId())));
+
+    return data.getTimeseries().get(sliceData);
+  }
+
+  /**
+   * Returns a data frame containing the same time daily data, based on input 
time
+   * @param originalDF the original dataframe
+   * @param time the epoch time of the start of the day
+   * @return DataFrame containing same time of daily data for LOOKBACK number 
of days
+   */
+  private DataFrame getDailyDF(DataFrame originalDF, Long time) {
+    LongSeries longSeries = (LongSeries) originalDF.get(COL_TIME);
+    long start = longSeries.getLong(0);
+    DateTime dt = new DateTime(time);
+    DataFrame df = DataFrame.builder(COL_TIME, COL_VALUE).build();
+
+    for (int i = 0; i < LOOKBACK; i++) {
+      DateTime subDt = new DateTime(dt);
+      subDt = subDt.minusDays(1);
+      long t = subDt.getMillis();
+      if (t < start) {
+        break;
+      }
+      int index = longSeries.find(t);
+      if (index != -1) {
+        df = df.append(originalDF.slice(index, index + 1));
+      } else { // If the 1 day look back data doesn't exist, use the data one 
period before
+        int backtrackCounter = 0; // Counter for how many periods we have 
searched for
+        while (index == -1) { // Stop when we found data
+          subDt = subDt.minusDays(period);
+          long tWO1W = subDt.getMillis();
+          index = longSeries.find(tWO1W);
+          if (index != -1) {
+            df = df.append(originalDF.slice(index, index + 1));
+          }
+          if (backtrackCounter > 4) { // Search up to 4 weeks, otherwise just 
append last value to next time frame
+            double lastVal = 
(originalDF.get(COL_VALUE)).getDouble(longSeries.find(dt.getMillis())); // The 
last value
+            DateTime nextDt = dt.minusDays(1); // The next datetime to be 
appended
+            DataFrame appendDf = DataFrame.builder(COL_TIME, 
COL_VALUE).append(nextDt, lastVal).build();
+            df = df.append(appendDf);
+            break;
+          }
+          backtrackCounter++;
+        }
+      }
+      dt = dt.minusDays(1);
+    }
+    df = df.reverse();
+    return df;
+  }
+
+  private static double calculateInitialLevel(double[] y) {
+    return y[0];
+  }
+
+  /**
+   * See: http://www.itl.nist.gov/div898/handbook/pmc/section4/pmc435.htm
+   *
+   * @return - Initial trend - Bt[1]
+   */
+  private static double calculateInitialTrend(double[] y, int period) {
+    double sum = 0;
+
+    for (int i = 0; i < period; i++) {
+      sum += y[period + i] - y[i];
+    }
+
+    return sum / (period * period);
+  }
+
+  /**
+   * See: http://www.itl.nist.gov/div898/handbook/pmc/section4/pmc435.htm
+   *
+   * @return - Seasonal Indices.
+   */
+  private static double[] calculateSeasonalIndices(double[] y, int period,
+      int seasons) {
+    double[] seasonalAverage = new double[seasons];
+    double[] seasonalIndices = new double[period];
+
+    double[] averagedObservations = new double[y.length];
+
+    for (int i = 0; i < seasons; i++) {
+      for (int j = 0; j < period; j++) {
+        seasonalAverage[i] += y[(i * period) + j];
+      }
+      seasonalAverage[i] /= period;
+    }
+
+    for (int i = 0; i < seasons; i++) {
+      for (int j = 0; j < period; j++) {
+        averagedObservations[(i * period) + j] = y[(i * period) + j]
+            / seasonalAverage[i];
+      }
+    }
+
+    for (int i = 0; i < period; i++) {
+      for (int j = 0; j < seasons; j++) {
+        seasonalIndices[i] += averagedObservations[(j * period) + i];
+      }
+      seasonalIndices[i] /= seasons;
+    }
+
+    return seasonalIndices;
+  }
+
+  /**
+   * Holt Winters forecasting method
+   *
+   * @param y Timeseries to be forecasted
+   * @param alpha level smoothing factor
+   * @param beta trend smoothing factor
+   * @param gamma seasonality smoothing factor
+   * @return double[] {y_hat, SSE, error_boundary}
+   */
+  private double[] forecast(double[] y, double alpha, double beta, double 
gamma) {
+    double[] It = new double[y.length+1];
+    double[] Ft = new double[y.length+1];
+
+    double a0 = calculateInitialLevel(y);
+    double b0 = calculateInitialTrend(y, period);
+
+    int seasons = y.length / period;
+    double[] initialSeasonalIndices = calculateSeasonalIndices(y, period,
+        seasons);
+
+    for (int i = 0; i < period; i++) {
+      It[i] = initialSeasonalIndices[i];
+    }
+
+    double s = a0;
+    double t = b0;
+    double predictedValue = 0;
+
+    for (int i = 0; i < y.length; i++) {
+      double sNew;
+      double tNew;
+      Ft[i] = (s + t) * It[i];
+      sNew = alpha * (y[i] / It[i]) + (1 - alpha) * (s + t);
+      tNew = beta * (sNew - s) + (1 - beta) * t;
+      if (i + period <= y.length) {
+        It[i + period] = gamma * (y[i] / (sNew * It[i])) + (1 - gamma) * It[i];
+      }
+      s = sNew;
+      t = tNew;
+      if (i == y.length - 1) {
+        predictedValue = (s+t) * It[i+1];
+      }
+    }
+
+    List<Double> diff = new ArrayList<>();
+    double sse = 0;
+    for (int i = 0; i < y.length; i++) {
+      if (Ft[i] != 0) {
+        sse += Math.pow(y[i] - Ft[i], 2);
+        diff.add(Math.abs(Ft[i] - y[i]));
+      }
+    }
+
+    double error = calculateErrorBound(diff, sensitivityToZscore(sensitivity));
+    return new double[]{predictedValue, sse, error};
+  }
+
+  /**
+   *
+   * @param inputDF training dataframe
+   * @param windowStartTime prediction start time
+   * @return DataFrame with timestamp, baseline, error bound
+   */
+  private DataFrame computePredictionInterval(DataFrame inputDF, long 
windowStartTime) {
+
+    DataFrame resultDF = new DataFrame();
+    DataFrame forecastDF = inputDF.filter(new Series.LongConditional() {
+      @Override
+      public boolean apply(long... values) {
+        return values[0] >= windowStartTime;
+      }
+    }, COL_TIME).dropNull();
+
+    int size = forecastDF.size();
+    double[] resultArray = new double[size];
+    long[] resultTimeArray = new long[size];
+    double[] errorArray = new double[size];
+
+    double lastAlpha = this.alpha;
 
 Review comment:
   Good point, because I accidentally deleted some code down below so this 
didn't make sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to