http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
new file mode 100644
index 0000000..ff8dbcf
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.methods.kstest;
+
+import org.apache.ambari.metrics.alertservice.prototype.RFunctionInvoker;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class KSTechnique implements Serializable {
+
+  private String methodType = "ks";
+  private Map<String, Double> pValueMap;
+  private static final Log LOG = LogFactory.getLog(KSTechnique.class);
+
+  public KSTechnique() {
+    pValueMap = new HashMap();
+  }
+
+  public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries 
testData) {
+
+    int testLength = testData.values.length;
+    int trainLength = trainData.values.length;
+
+    if (trainLength < testLength) {
+      LOG.info("Not enough train data.");
+      return null;
+    }
+
+    if (!pValueMap.containsKey(key)) {
+      pValueMap.put(key, 0.05);
+    }
+    double pValue = pValueMap.get(key);
+
+    ResultSet result = RFunctionInvoker.ksTest(trainData, testData, 
Collections.singletonMap("ks.p_value", String.valueOf(pValue)));
+    if (result == null) {
+      LOG.error("Resultset is null when invoking KS R function...");
+      return null;
+    }
+
+    if (result.resultset.size() > 0) {
+
+      LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length);
+      LOG.info("p_value = " + result.resultset.get(3)[0]);
+      double dValue = result.resultset.get(2)[0];
+
+      return new MetricAnomaly(key,
+        (long) testData.ts[testLength - 1],
+        testData.values[testLength - 1],
+        methodType,
+        dValue);
+    }
+
+    return null;
+  }
+
+  public void updateModel(String metricKey, boolean increaseSensitivity, 
double percent) {
+
+    LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity 
= " + increaseSensitivity + ", percent = " + percent);
+
+    if (!pValueMap.containsKey(metricKey)) {
+      LOG.error("Unknown metric key : " + metricKey);
+      LOG.info("pValueMap :" + pValueMap.toString());
+      return;
+    }
+
+    double delta = percent / 100;
+    if (!increaseSensitivity) {
+      delta = delta * -1;
+    }
+
+    double pValue = pValueMap.get(metricKey);
+    double newPValue = Math.min(1.0, pValue + delta * pValue);
+    pValueMap.put(metricKey, newPValue);
+    LOG.info("New pValue = " + newPValue);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
new file mode 100644
index 0000000..a8e31bf
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+public interface AbstractMetricSeries {
+
+  public double nextValue();
+  public double[] getSeries(int n);
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
new file mode 100644
index 0000000..4158ff4
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class DualBandMetricSeries implements AbstractMetricSeries {
+
+  double lowBandValue = 0.0;
+  double lowBandDeviationPercentage = 0.0;
+  int lowBandPeriodSize = 10;
+  double highBandValue = 1.0;
+  double highBandDeviationPercentage = 0.0;
+  int highBandPeriodSize = 10;
+
+  Random random = new Random();
+  double lowBandValueLowerLimit, lowBandValueHigherLimit;
+  double highBandLowerLimit, highBandUpperLimit;
+  int l = 0, h = 0;
+
+  public DualBandMetricSeries(double lowBandValue,
+                              double lowBandDeviationPercentage,
+                              int lowBandPeriodSize,
+                              double highBandValue,
+                              double highBandDeviationPercentage,
+                              int highBandPeriodSize) {
+    this.lowBandValue = lowBandValue;
+    this.lowBandDeviationPercentage = lowBandDeviationPercentage;
+    this.lowBandPeriodSize = lowBandPeriodSize;
+    this.highBandValue = highBandValue;
+    this.highBandDeviationPercentage = highBandDeviationPercentage;
+    this.highBandPeriodSize = highBandPeriodSize;
+    init();
+  }
+
+  private void init() {
+    lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * 
lowBandValue;
+    lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * 
lowBandValue;
+    highBandLowerLimit = highBandValue - highBandDeviationPercentage * 
highBandValue;
+    highBandUpperLimit = highBandValue + highBandDeviationPercentage * 
highBandValue;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value = 0.0;
+
+    if (l < lowBandPeriodSize) {
+      value = lowBandValueLowerLimit + (lowBandValueHigherLimit - 
lowBandValueLowerLimit) * random.nextDouble();
+      l++;
+    } else if (h < highBandPeriodSize) {
+      value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * 
random.nextDouble();
+      h++;
+    }
+
+    if (l == lowBandPeriodSize && h == highBandPeriodSize) {
+      l = 0;
+      h = 0;
+    }
+
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
new file mode 100644
index 0000000..1e37ff3
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Random;
+
+public class MetricSeriesGeneratorFactory {
+
+  /**
+   * Return a normally distributed data series with some deviation % and 
outliers.
+   *
+   * @param n                                size of the data series
+   * @param value                            The value around which the 
uniform data series is centered on.
+   * @param deviationPercentage              The allowed deviation % on either 
side of the uniform value. For example, if value = 10, and deviation % is 0.1, 
the series values lie between 0.9 to 1.1.
+   * @param outlierProbability               The probability of finding an 
outlier in the series.
+   * @param outlierDeviationLowerPercentage  min percentage outlier should be 
away from the uniform value in % terms. if value = 10 and 
outlierDeviationPercentage = 30%, the outlier is 7 and  13.
+   * @param outlierDeviationHigherPercentage max percentage outlier should be 
away from the uniform value in % terms. if value = 10 and 
outlierDeviationPercentage = 60%, the outlier is 4 and  16.
+   * @param outliersAboveValue               Outlier should be greater or 
smaller than the value.
+   * @return uniform series
+   */
+  public static double[] createUniformSeries(int n,
+                                             double value,
+                                             double deviationPercentage,
+                                             double outlierProbability,
+                                             double 
outlierDeviationLowerPercentage,
+                                             double 
outlierDeviationHigherPercentage,
+                                             boolean outliersAboveValue) {
+
+    UniformMetricSeries metricSeries = new UniformMetricSeries(value,
+      deviationPercentage,
+      outlierProbability,
+      outlierDeviationLowerPercentage,
+      outlierDeviationHigherPercentage,
+      outliersAboveValue);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * /**
+   * Returns a normally distributed series.
+   *
+   * @param n                             size of the data series
+   * @param mean                          mean of the distribution
+   * @param sd                            sd of the distribution
+   * @param outlierProbability            sd of the distribution
+   * @param outlierDeviationSDTimesLower  Lower Limit of the outlier with 
respect to times sdev from the mean.
+   * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with 
respect to times sdev from the mean.
+   * @param outlierOnRightEnd             Outlier should be on the right end 
or the left end.
+   * @return normal series
+   */
+  public static double[] createNormalSeries(int n,
+                                            double mean,
+                                            double sd,
+                                            double outlierProbability,
+                                            double 
outlierDeviationSDTimesLower,
+                                            double 
outlierDeviationSDTimesHigher,
+                                            boolean outlierOnRightEnd) {
+
+
+    NormalMetricSeries metricSeries = new NormalMetricSeries(mean,
+      sd,
+      outlierProbability,
+      outlierDeviationSDTimesLower,
+      outlierDeviationSDTimesHigher,
+      outlierOnRightEnd);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * Returns a monotonically increasing / decreasing series
+   *
+   * @param n                                size of the data series
+   * @param startValue                       Start value of the monotonic 
sequence
+   * @param slope                            direction of monotonicity m > 0 
for increasing and m < 0 for decreasing.
+   * @param deviationPercentage              The allowed deviation % on either 
side of the current 'y' value. For example, if current value = 10 according to 
slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1.
+   * @param outlierProbability               The probability of finding an 
outlier in the series.
+   * @param outlierDeviationLowerPercentage  min percentage outlier should be 
away from the current 'y' value in % terms. if value = 10 and 
outlierDeviationPercentage = 30%, the outlier is 7 and  13.
+   * @param outlierDeviationHigherPercentage max percentage outlier should be 
away from the current 'y' value in % terms. if value = 10 and 
outlierDeviationPercentage = 60%, the outlier is 4 and  16.
+   * @param outliersAboveValue               Outlier should be greater or 
smaller than the 'y' value.
+   * @return
+   */
+  public static double[] createMonotonicSeries(int n,
+                                               double startValue,
+                                               double slope,
+                                               double deviationPercentage,
+                                               double outlierProbability,
+                                               double 
outlierDeviationLowerPercentage,
+                                               double 
outlierDeviationHigherPercentage,
+                                               boolean outliersAboveValue) {
+
+    MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue,
+      slope,
+      deviationPercentage,
+      outlierProbability,
+      outlierDeviationLowerPercentage,
+      outlierDeviationHigherPercentage,
+      outliersAboveValue);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  /**
+   * Returns a dual band series (lower and higher)
+   *
+   * @param n                           size of the data series
+   * @param lowBandValue                lower band value
+   * @param lowBandDeviationPercentage  lower band deviation
+   * @param lowBandPeriodSize           lower band
+   * @param highBandValue               high band centre value
+   * @param highBandDeviationPercentage high band deviation.
+   * @param highBandPeriodSize          high band size
+   * @return
+   */
+  public static double[] getDualBandSeries(int n,
+                                           double lowBandValue,
+                                           double lowBandDeviationPercentage,
+                                           int lowBandPeriodSize,
+                                           double highBandValue,
+                                           double highBandDeviationPercentage,
+                                           int highBandPeriodSize) {
+
+    DualBandMetricSeries metricSeries  = new DualBandMetricSeries(lowBandValue,
+      lowBandDeviationPercentage,
+      lowBandPeriodSize,
+      highBandValue,
+      highBandDeviationPercentage,
+      highBandPeriodSize);
+
+    return metricSeries.getSeries(n);
+  }
+
+  /**
+   * Returns a step function series.
+   *
+   * @param n                              size of the data series
+   * @param startValue                     start steady value
+   * @param steadyValueDeviationPercentage required devation in the steady 
state value
+   * @param steadyPeriodSlope              direction of monotonicity m > 0 for 
increasing and m < 0 for decreasing, m = 0 no increase or decrease.
+   * @param steadyPeriodMinSize            min size for step period
+   * @param steadyPeriodMaxSize            max size for step period.
+   * @param stepChangePercentage           Increase / decrease in steady state 
to denote a step in terms of deviation percentage from the last value.
+   * @param upwardStep                     upward or downward step.
+   * @return
+   */
+  public static double[] getStepFunctionSeries(int n,
+                                               double startValue,
+                                               double 
steadyValueDeviationPercentage,
+                                               double steadyPeriodSlope,
+                                               int steadyPeriodMinSize,
+                                               int steadyPeriodMaxSize,
+                                               double stepChangePercentage,
+                                               boolean upwardStep) {
+
+    StepFunctionMetricSeries metricSeries = new 
StepFunctionMetricSeries(startValue,
+      steadyValueDeviationPercentage,
+      steadyPeriodSlope,
+      steadyPeriodMinSize,
+      steadyPeriodMaxSize,
+      stepChangePercentage,
+      upwardStep);
+
+    return metricSeries.getSeries(n);
+  }
+
+  /**
+   * Series with small period of turbulence and then back to steady.
+   *
+   * @param n                                        size of the data series
+   * @param steadyStateValue                         steady state center value
+   * @param steadyStateDeviationPercentage           steady state deviation in 
percentage
+   * @param turbulentPeriodDeviationLowerPercentage  turbulent state lower 
limit in terms of percentage from centre value.
+   * @param turbulentPeriodDeviationHigherPercentage turbulent state higher 
limit in terms of percentage from centre value.
+   * @param turbulentPeriodLength                    turbulent period length 
(number of points)
+   * @param turbulentStatePosition                   Where the turbulent state 
should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 
- at the end of the series.
+   * @return
+   */
+  public static double[] getSteadySeriesWithTurbulentPeriod(int n,
+                                                            double 
steadyStateValue,
+                                                            double 
steadyStateDeviationPercentage,
+                                                            double 
turbulentPeriodDeviationLowerPercentage,
+                                                            double 
turbulentPeriodDeviationHigherPercentage,
+                                                            int 
turbulentPeriodLength,
+                                                            int 
turbulentStatePosition
+  ) {
+
+
+    SteadyWithTurbulenceMetricSeries metricSeries = new 
SteadyWithTurbulenceMetricSeries(n,
+      steadyStateValue,
+      steadyStateDeviationPercentage,
+      turbulentPeriodDeviationLowerPercentage,
+      turbulentPeriodDeviationHigherPercentage,
+      turbulentPeriodLength,
+      turbulentStatePosition);
+
+    return metricSeries.getSeries(n);
+  }
+
+
+  public static double[] generateSeries(String type, int n, Map<String, 
String> configs) {
+
+    double[] series;
+    switch (type) {
+
+      case "normal":
+        series = createNormalSeries(n,
+          Double.parseDouble(configs.getOrDefault("mean", "0")),
+          Double.parseDouble(configs.getOrDefault("sd", "1")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", 
"true")));
+        break;
+
+      case "uniform":
+        series = createUniformSeries(n,
+          Double.parseDouble(configs.getOrDefault("value", "10")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", 
"0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", 
"0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", 
"true")));
+        break;
+
+      case "monotonic":
+        series = createMonotonicSeries(n,
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("slope", "0")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", 
"0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", 
"0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", 
"true")));
+        break;
+
+      case "dualband":
+        series = getDualBandSeries(n,
+          Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+          Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+        break;
+
+      case "step":
+        series = getStepFunctionSeries(n,
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+          Double.parseDouble(configs.getOrDefault("stepChangePercentage", 
"0")),
+          Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+        break;
+
+      case "turbulence":
+        series = getSteadySeriesWithTurbulentPeriod(n,
+          Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+          
Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage",
 "0")),
+          
Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage",
 "10")),
+          Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+          Integer.parseInt(configs.getOrDefault("turbulentStatePosition", 
"0")));
+        break;
+
+      default:
+        series = createNormalSeries(n,
+          0,
+          1,
+          0,
+          0,
+          0,
+          true);
+    }
+    return series;
+  }
+
+  public static AbstractMetricSeries generateSeries(String type, Map<String, 
String> configs) {
+
+    AbstractMetricSeries series;
+    switch (type) {
+
+      case "normal":
+        series = new 
NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")),
+          Double.parseDouble(configs.getOrDefault("sd", "1")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")),
+          Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", 
"true")));
+        break;
+
+      case "uniform":
+        series = new UniformMetricSeries(
+          Double.parseDouble(configs.getOrDefault("value", "10")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", 
"0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", 
"0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", 
"true")));
+        break;
+
+      case "monotonic":
+        series = new MonotonicMetricSeries(
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          Double.parseDouble(configs.getOrDefault("slope", "0")),
+          Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("outlierProbability", "0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", 
"0")),
+          
Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", 
"0")),
+          Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", 
"true")));
+        break;
+
+      case "dualband":
+        series = new DualBandMetricSeries(
+          Double.parseDouble(configs.getOrDefault("lowBandValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")),
+          Double.parseDouble(configs.getOrDefault("highBandValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")),
+          Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0")));
+        break;
+
+      case "step":
+        series = new StepFunctionMetricSeries(
+          Double.parseDouble(configs.getOrDefault("startValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")),
+          Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")),
+          Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")),
+          Double.parseDouble(configs.getOrDefault("stepChangePercentage", 
"0")),
+          Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true")));
+        break;
+
+      case "turbulence":
+        series = new SteadyWithTurbulenceMetricSeries(
+          Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")),
+          Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")),
+          
Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")),
+          
Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage",
 "0")),
+          
Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage",
 "10")),
+          Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")),
+          Integer.parseInt(configs.getOrDefault("turbulentStatePosition", 
"0")));
+        break;
+
+      default:
+        series = new NormalMetricSeries(0,
+          1,
+          0,
+          0,
+          0,
+          true);
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
new file mode 100644
index 0000000..a883d08
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class MonotonicMetricSeries implements AbstractMetricSeries {
+
+  double startValue = 0.0;
+  double slope = 0.5;
+  double deviationPercentage = 0.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationLowerPercentage = 0.0;
+  double outlierDeviationHigherPercentage = 0.0;
+  boolean outliersAboveValue = true;
+
+  Random random = new Random();
+  double nonOutlierProbability;
+
+  // y = mx + c
+  double y;
+  double m;
+  double x;
+  double c;
+
+  public MonotonicMetricSeries(double startValue,
+                               double slope,
+                               double deviationPercentage,
+                               double outlierProbability,
+                               double outlierDeviationLowerPercentage,
+                               double outlierDeviationHigherPercentage,
+                               boolean outliersAboveValue) {
+    this.startValue = startValue;
+    this.slope = slope;
+    this.deviationPercentage = deviationPercentage;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+    this.outliersAboveValue = outliersAboveValue;
+    init();
+  }
+
+  private void init() {
+    y = startValue;
+    m = slope;
+    x = 1;
+    c = y - (m * x);
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    y = m * x + c;
+    if (probability <= nonOutlierProbability) {
+      double valueDeviationLowerLimit = y - deviationPercentage * y;
+      double valueDeviationHigherLimit = y + deviationPercentage * y;
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - 
valueDeviationLowerLimit) * random.nextDouble();
+    } else {
+      if (outliersAboveValue) {
+        double outlierLowerLimit = y + outlierDeviationLowerPercentage * y;
+        double outlierUpperLimit = y + outlierDeviationHigherPercentage * y;
+        value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * 
random.nextDouble();
+      } else {
+        double outlierLowerLimit = y - outlierDeviationLowerPercentage * y;
+        double outlierUpperLimit = y - outlierDeviationHigherPercentage * y;
+        value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * 
random.nextDouble();
+      }
+    }
+    x++;
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
new file mode 100644
index 0000000..cc83d2c
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class NormalMetricSeries implements AbstractMetricSeries {
+
+  double mean = 0.0;
+  double sd = 1.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationSDTimesLower = 0.0;
+  double outlierDeviationSDTimesHigher = 0.0;
+  boolean outlierOnRightEnd = true;
+
+  Random random = new Random();
+  double nonOutlierProbability;
+
+
+  public NormalMetricSeries(double mean,
+                            double sd,
+                            double outlierProbability,
+                            double outlierDeviationSDTimesLower,
+                            double outlierDeviationSDTimesHigher,
+                            boolean outlierOnRightEnd) {
+    this.mean = mean;
+    this.sd = sd;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower;
+    this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher;
+    this.outlierOnRightEnd = outlierOnRightEnd;
+    init();
+  }
+
+  private void init() {
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    if (probability <= nonOutlierProbability) {
+      value = random.nextGaussian() * sd + mean;
+    } else {
+      if (outlierOnRightEnd) {
+        value = mean + (outlierDeviationSDTimesLower + 
(outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * 
random.nextDouble()) * sd;
+      } else {
+        value = mean - (outlierDeviationSDTimesLower + 
(outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * 
random.nextDouble()) * sd;
+      }
+    }
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
new file mode 100644
index 0000000..c4ed3ba
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries {
+
+  double steadyStateValue = 0.0;
+  double steadyStateDeviationPercentage = 0.0;
+  double turbulentPeriodDeviationLowerPercentage = 0.3;
+  double turbulentPeriodDeviationHigherPercentage = 0.5;
+  int turbulentPeriodLength = 5;
+  int turbulentStatePosition = 1;
+  int approximateSeriesLength = 10;
+
+  Random random = new Random();
+  double valueDeviationLowerLimit;
+  double valueDeviationHigherLimit;
+  double tPeriodLowerLimit;
+  double tPeriodUpperLimit;
+  int tPeriodStartIndex = 0;
+  int index = 0;
+
+  public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength,
+                                          double steadyStateValue,
+                                          double 
steadyStateDeviationPercentage,
+                                          double 
turbulentPeriodDeviationLowerPercentage,
+                                          double 
turbulentPeriodDeviationHigherPercentage,
+                                          int turbulentPeriodLength,
+                                          int turbulentStatePosition) {
+    this.approximateSeriesLength = approximateSeriesLength;
+    this.steadyStateValue = steadyStateValue;
+    this.steadyStateDeviationPercentage = steadyStateDeviationPercentage;
+    this.turbulentPeriodDeviationLowerPercentage = 
turbulentPeriodDeviationLowerPercentage;
+    this.turbulentPeriodDeviationHigherPercentage = 
turbulentPeriodDeviationHigherPercentage;
+    this.turbulentPeriodLength = turbulentPeriodLength;
+    this.turbulentStatePosition = turbulentStatePosition;
+    init();
+  }
+
+  private void init() {
+
+    if (turbulentStatePosition == 1) {
+      tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * 
approximateSeriesLength * random.nextDouble()));
+    } else if (turbulentStatePosition == 2) {
+      tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength;
+    }
+
+    valueDeviationLowerLimit = steadyStateValue - 
steadyStateDeviationPercentage * steadyStateValue;
+    valueDeviationHigherLimit = steadyStateValue + 
steadyStateDeviationPercentage * steadyStateValue;
+
+    tPeriodLowerLimit = steadyStateValue + 
turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+    tPeriodUpperLimit = steadyStateValue + 
turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+
+    if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + 
turbulentPeriodLength)) {
+      value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * 
random.nextDouble();
+    } else {
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - 
valueDeviationLowerLimit) * random.nextDouble();
+    }
+    index++;
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+
+    double[] series = new double[n];
+    int turbulentPeriodStartIndex = 0;
+
+    if (turbulentStatePosition == 1) {
+      turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * 
random.nextDouble()));
+    } else if (turbulentStatePosition == 2) {
+      turbulentPeriodStartIndex = n - turbulentPeriodLength;
+    }
+
+    double valueDevLowerLimit = steadyStateValue - 
steadyStateDeviationPercentage * steadyStateValue;
+    double valueDevHigherLimit = steadyStateValue + 
steadyStateDeviationPercentage * steadyStateValue;
+
+    double turbulentPeriodLowerLimit = steadyStateValue + 
turbulentPeriodDeviationLowerPercentage * steadyStateValue;
+    double turbulentPeriodUpperLimit = steadyStateValue + 
turbulentPeriodDeviationHigherPercentage * steadyStateValue;
+
+    for (int i = 0; i < n; i++) {
+      if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + 
turbulentPeriodLength)) {
+        series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - 
turbulentPeriodLowerLimit) * random.nextDouble();
+      } else {
+        series[i] = valueDevLowerLimit + (valueDevHigherLimit - 
valueDevLowerLimit) * random.nextDouble();
+      }
+    }
+
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
new file mode 100644
index 0000000..d5beb48
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class StepFunctionMetricSeries implements AbstractMetricSeries {
+
+  double startValue = 0.0;
+  double steadyValueDeviationPercentage = 0.0;
+  double steadyPeriodSlope = 0.5;
+  int steadyPeriodMinSize = 10;
+  int steadyPeriodMaxSize = 20;
+  double stepChangePercentage = 0.0;
+  boolean upwardStep = true;
+
+  Random random = new Random();
+
+  // y = mx + c
+  double y;
+  double m;
+  double x;
+  double c;
+  int currentStepSize;
+  int currentIndex;
+
+  public StepFunctionMetricSeries(double startValue,
+                                  double steadyValueDeviationPercentage,
+                                  double steadyPeriodSlope,
+                                  int steadyPeriodMinSize,
+                                  int steadyPeriodMaxSize,
+                                  double stepChangePercentage,
+                                  boolean upwardStep) {
+    this.startValue = startValue;
+    this.steadyValueDeviationPercentage = steadyValueDeviationPercentage;
+    this.steadyPeriodSlope = steadyPeriodSlope;
+    this.steadyPeriodMinSize = steadyPeriodMinSize;
+    this.steadyPeriodMaxSize = steadyPeriodMaxSize;
+    this.stepChangePercentage = stepChangePercentage;
+    this.upwardStep = upwardStep;
+    init();
+  }
+
+  private void init() {
+    y = startValue;
+    m = steadyPeriodSlope;
+    x = 1;
+    c = y - (m * x);
+
+    currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - 
steadyPeriodMinSize) * random.nextDouble());
+    currentIndex = 0;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value = 0.0;
+
+    if (currentIndex < currentStepSize) {
+      y = m * x + c;
+      double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y;
+      double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * 
y;
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - 
valueDeviationLowerLimit) * random.nextDouble();
+      x++;
+      currentIndex++;
+    }
+
+    if (currentIndex == currentStepSize) {
+      currentIndex = 0;
+      currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - 
steadyPeriodMinSize) * random.nextDouble());
+      if (upwardStep) {
+        y = y + stepChangePercentage * y;
+      } else {
+        y = y - stepChangePercentage * y;
+      }
+      x = 1;
+      c = y - (m * x);
+    }
+
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
new file mode 100644
index 0000000..a2b0eea
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import java.util.Random;
+
+public class UniformMetricSeries implements AbstractMetricSeries {
+
+  double value = 0.0;
+  double deviationPercentage = 0.0;
+  double outlierProbability = 0.0;
+  double outlierDeviationLowerPercentage = 0.0;
+  double outlierDeviationHigherPercentage = 0.0;
+  boolean outliersAboveValue= true;
+
+  Random random = new Random();
+  double valueDeviationLowerLimit;
+  double valueDeviationHigherLimit;
+  double outlierLeftLowerLimit;
+  double outlierLeftHigherLimit;
+  double outlierRightLowerLimit;
+  double outlierRightUpperLimit;
+  double nonOutlierProbability;
+
+
+  public UniformMetricSeries(double value,
+                             double deviationPercentage,
+                             double outlierProbability,
+                             double outlierDeviationLowerPercentage,
+                             double outlierDeviationHigherPercentage,
+                             boolean outliersAboveValue) {
+    this.value = value;
+    this.deviationPercentage = deviationPercentage;
+    this.outlierProbability = outlierProbability;
+    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
+    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
+    this.outliersAboveValue = outliersAboveValue;
+    init();
+  }
+
+  private void init() {
+    valueDeviationLowerLimit = value - deviationPercentage * value;
+    valueDeviationHigherLimit = value + deviationPercentage * value;
+
+    outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value;
+    outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value;
+    outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value;
+    outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value;
+
+    nonOutlierProbability = 1.0 - outlierProbability;
+  }
+
+  @Override
+  public double nextValue() {
+
+    double value;
+    double probability = random.nextDouble();
+
+    if (probability <= nonOutlierProbability) {
+      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - 
valueDeviationLowerLimit) * random.nextDouble();
+    } else {
+      if (!outliersAboveValue) {
+        value = outlierLeftLowerLimit + (outlierLeftHigherLimit - 
outlierLeftLowerLimit) * random.nextDouble();
+      } else {
+        value = outlierRightLowerLimit + (outlierRightUpperLimit - 
outlierRightLowerLimit) * random.nextDouble();
+      }
+    }
+    return value;
+  }
+
+  @Override
+  public double[] getSeries(int n) {
+    double[] series = new double[n];
+    for (int i = 0; i < n; i++) {
+      series[i] = nextValue();
+    }
+    return series;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
deleted file mode 100644
index daaee5c..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.kafka.clients.producer.*;
-
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-public class AmsKafkaProducer {
-
-    Producer producer;
-    private static String topicName = "ambari-metrics-topic";
-
-    public AmsKafkaProducer(String kafkaServers) {
-        Properties configProperties = new Properties();
-        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
-        
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
-        
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
-        producer = new KafkaProducer(configProperties);
-    }
-
-    public void sendMetrics(TimelineMetrics timelineMetrics) throws 
InterruptedException, ExecutionException {
-
-        ObjectMapper objectMapper = new ObjectMapper();
-        JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
-        ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, 
JsonNode>(topicName,jsonNode);
-        Future<RecordMetadata> kafkaFuture =  producer.send(rec);
-
-        System.out.println(kafkaFuture.isDone());
-        System.out.println(kafkaFuture.get().topic());
-    }
-
-    public static void main(String[] args) throws ExecutionException, 
InterruptedException {
-        final long now = System.currentTimeMillis();
-
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        TimelineMetric metric1 = new TimelineMetric();
-        metric1.setMetricName("mem_free");
-        metric1.setHostName("avijayan-ams-3.openstacklocal");
-        metric1.setTimestamp(now);
-        metric1.setStartTime(now - 1000);
-        metric1.setAppId("HOST");
-        metric1.setType("Integer");
-
-        TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-
-        for (int i = 0; i<20;i++) {
-            double metric = 20000 + Math.random();
-            metricValues.put(now - i*100, metric);
-        }
-
-        metric1.setMetricValues(metricValues);
-
-//        metric1.setMetricValues(new TreeMap<Long, Double>() {{
-//            put(now - 100, 1.20);
-//            put(now - 200, 11.25);
-//            put(now - 300, 1.30);
-//            put(now - 400, 4.50);
-//            put(now - 500, 16.35);
-//            put(now - 400, 5.50);
-//        }});
-
-        timelineMetrics.getMetrics().add(metric1);
-
-        for (int i = 0; i<1; i++) {
-            new 
AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics);
-            Thread.sleep(1000);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
deleted file mode 100644
index d65790e..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.*;
-
-public class AnomalyMetricPublisher implements Serializable {
-
-    private String hostName = "UNKNOWN.example.com";
-    private String instanceId = null;
-    private String serviceName = "anomaly-engine";
-    private String collectorHost;
-    private String protocol;
-    private String port;
-    private static final String WS_V1_TIMELINE_METRICS = 
"/ws/v1/timeline/metrics";
-    private static final Log LOG = 
LogFactory.getLog(AnomalyMetricPublisher.class);
-    private static ObjectMapper mapper;
-
-    static {
-        mapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.getSerializationConfig()
-                .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-    }
-
-    public AnomalyMetricPublisher(String collectorHost, String protocol, 
String port) {
-        this.collectorHost = collectorHost;
-        this.protocol = protocol;
-        this.port = port;
-        this.hostName = getDefaultLocalHostName();
-    }
-
-    private String getDefaultLocalHostName() {
-        try {
-            return InetAddress.getLocalHost().getCanonicalHostName();
-        } catch (UnknownHostException e) {
-            LOG.info("Error getting host address");
-        }
-        return null;
-    }
-
-    public void publish(List<MetricAnomaly> metricAnomalies) {
-        LOG.info("Sending metric anomalies of size : " + 
metricAnomalies.size());
-        List<TimelineMetric> metricList = 
getTimelineMetricList(metricAnomalies);
-        if (!metricList.isEmpty()) {
-            TimelineMetrics timelineMetrics = new TimelineMetrics();
-            timelineMetrics.setMetrics(metricList);
-            emitMetrics(timelineMetrics);
-        }
-    }
-
-    private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> 
metricAnomalies) {
-        List<TimelineMetric> metrics = new ArrayList<>();
-
-        if (metricAnomalies.isEmpty()) {
-            return metrics;
-        }
-
-        long currentTime = System.currentTimeMillis();
-        MetricAnomaly prevAnomaly = metricAnomalies.get(0);
-
-        TimelineMetric timelineMetric = new TimelineMetric();
-        timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + 
prevAnomaly.getMethodResult().getMethodType());
-        timelineMetric.setAppId(serviceName);
-        timelineMetric.setInstanceId(instanceId);
-        timelineMetric.setHostName(hostName);
-        timelineMetric.setStartTime(currentTime);
-
-        TreeMap<Long,Double> metricValues = new TreeMap<>();
-        metricValues.put(prevAnomaly.getTimestamp(), 
prevAnomaly.getMetricValue());
-        MetricAnomaly currentAnomaly;
-
-        for (int i = 1; i < metricAnomalies.size(); i++) {
-            currentAnomaly = metricAnomalies.get(i);
-            if 
(currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) {
-                metricValues.put(currentAnomaly.getTimestamp(), 
currentAnomaly.getMetricValue());
-            } else {
-                timelineMetric.setMetricValues(metricValues);
-                metrics.add(timelineMetric);
-
-                timelineMetric = new TimelineMetric();
-                timelineMetric.setMetricName(currentAnomaly.getMetricKey() + 
"_" + currentAnomaly.getMethodResult().getMethodType());
-                timelineMetric.setAppId(serviceName);
-                timelineMetric.setInstanceId(instanceId);
-                timelineMetric.setHostName(hostName);
-                timelineMetric.setStartTime(currentTime);
-                metricValues = new TreeMap<>();
-                metricValues.put(currentAnomaly.getTimestamp(), 
currentAnomaly.getMetricValue());
-                prevAnomaly = currentAnomaly;
-            }
-        }
-
-        timelineMetric.setMetricValues(metricValues);
-        metrics.add(timelineMetric);
-        return metrics;
-    }
-
-    private boolean emitMetrics(TimelineMetrics metrics) {
-        String connectUrl = constructTimelineMetricUri();
-        String jsonData = null;
-        LOG.info("EmitMetrics connectUrl = "  + connectUrl);
-        try {
-            jsonData = mapper.writeValueAsString(metrics);
-        } catch (IOException e) {
-            LOG.error("Unable to parse metrics", e);
-        }
-        if (jsonData != null) {
-            return emitMetricsJson(connectUrl, jsonData);
-        }
-        return false;
-    }
-
-    private HttpURLConnection getConnection(String spec) throws IOException {
-        return (HttpURLConnection) new URL(spec).openConnection();
-    }
-
-    private boolean emitMetricsJson(String connectUrl, String jsonData) {
-        int timeout = 10000;
-        HttpURLConnection connection = null;
-        try {
-            if (connectUrl == null) {
-                throw new IOException("Unknown URL. Unable to connect to 
metrics collector.");
-            }
-            connection = getConnection(connectUrl);
-
-            connection.setRequestMethod("POST");
-            connection.setRequestProperty("Content-Type", "application/json");
-            connection.setRequestProperty("Connection", "Keep-Alive");
-            connection.setConnectTimeout(timeout);
-            connection.setReadTimeout(timeout);
-            connection.setDoOutput(true);
-
-            if (jsonData != null) {
-                try (OutputStream os = connection.getOutputStream()) {
-                    os.write(jsonData.getBytes("UTF-8"));
-                }
-            }
-
-            int statusCode = connection.getResponseCode();
-
-            if (statusCode != 200) {
-                LOG.info("Unable to POST metrics to collector, " + connectUrl 
+ ", " +
-                        "statusCode = " + statusCode);
-            } else {
-                LOG.info("Metrics posted to Collector " + connectUrl);
-            }
-            return true;
-        } catch (IOException ioe) {
-            LOG.error(ioe.getMessage());
-        }
-        return false;
-    }
-
-    private String constructTimelineMetricUri() {
-        StringBuilder sb = new StringBuilder(protocol);
-        sb.append("://");
-        sb.append(collectorHost);
-        sb.append(":");
-        sb.append(port);
-        sb.append(WS_V1_TIMELINE_METRICS);
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
deleted file mode 100644
index 3989c67..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.spark;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.common.MetricAnomaly;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric;
-import org.apache.ambari.metrics.alertservice.common.TimelineMetrics;
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel;
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel;
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import scala.Tuple2;
-
-import java.util.*;
-
-public class MetricAnomalyDetector {
-
-    private static final Log LOG = 
LogFactory.getLog(MetricAnomalyDetector.class);
-    private static String groupId = "ambari-metrics-group";
-    private static String topicName = "ambari-metrics-topic";
-    private static int numThreads = 1;
-
-    //private static String zkQuorum = 
"avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181";
-    //private static Map<String, String> kafkaParams = new HashMap<>();
-    //static {
-    //    kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"avijayan-ams-2.openstacklocal:6667");
-    //    kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-    //    kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.connect.json.JsonSerializer");
-    //    kafkaParams.put("metadata.broker.list", 
"avijayan-ams-2.openstacklocal:6667");
-    //}
-
-    public MetricAnomalyDetector() {
-    }
-
-    public static void main(String[] args) throws InterruptedException {
-
-
-        if (args.length < 6) {
-            System.err.println("Usage: MetricAnomalyDetector <method1,method2> 
<appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>");
-            System.exit(1);
-        }
-
-        List<String> appIds = Arrays.asList(args[1].split(","));
-        String collectorHost = args[2];
-        String collectorPort = args[3];
-        String collectorProtocol = args[4];
-        String zkQuorum = args[5];
-
-        List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>();
-        AnomalyMetricPublisher anomalyMetricPublisher = new 
AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort);
-
-        SparkConf sparkConf = new 
SparkConf().setAppName("AmbariMetricsAnomalyDetector");
-
-        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(10000));
-
-        for (String method : args[0].split(",")) {
-            if (method.equals("ema")) {
-                LOG.info("Model EMA requested.");
-                EmaModel emaModel = new 
EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema");
-                anomalyDetectionModels.add(emaModel);
-            }
-        }
-
-        JavaPairReceiverInputDStream<String, String> messages =
-                KafkaUtils.createStream(jssc, zkQuorum, groupId, 
Collections.singletonMap(topicName, numThreads));
-
-        //Convert JSON string to TimelineMetrics.
-        JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new 
Function<Tuple2<String, String>, TimelineMetrics>() {
-            @Override
-            public TimelineMetrics call(Tuple2<String, String> message) throws 
Exception {
-                ObjectMapper mapper = new ObjectMapper();
-                TimelineMetrics metrics = mapper.readValue(message._2, 
TimelineMetrics.class);
-                return metrics;
-            }
-        });
-
-        //Group TimelineMetric by AppId.
-        JavaPairDStream<String, TimelineMetrics> appMetricStream = 
timelineMetricsStream.mapToPair(
-                timelineMetrics -> new Tuple2<String, 
TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics)
-        );
-
-        appMetricStream.print();
-
-        //Filter AppIds that are not needed.
-        JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = 
appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() 
{
-            @Override
-            public Boolean call(Tuple2<String, TimelineMetrics> 
appMetricTuple) throws Exception {
-                return appIds.contains(appMetricTuple._1);
-            }
-        });
-
-        filteredAppMetricStream.print();
-
-        filteredAppMetricStream.foreachRDD(rdd -> {
-            rdd.foreach(
-                    tuple2 -> {
-                        TimelineMetrics metrics = tuple2._2();
-                        for (TimelineMetric metric : metrics.getMetrics()) {
-
-                            TimelineMetric timelineMetric =
-                                    new TimelineMetric(metric.getMetricName(), 
metric.getAppId(), metric.getHostName(), metric.getMetricValues());
-
-                            for (MetricAnomalyModel model : 
anomalyDetectionModels) {
-                                List<MetricAnomaly> anomalies = 
model.test(timelineMetric);
-                                anomalyMetricPublisher.publish(anomalies);
-                                for (MetricAnomaly anomaly : anomalies) {
-                                    LOG.info(anomaly.getAnomalyAsString());
-                                }
-
-                            }
-                        }
-                    });
-        });
-
-        jssc.start();
-        jssc.awaitTermination();
-    }
-}
-
-
-
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
index b25e79d..bca3366 100644
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
@@ -25,12 +25,9 @@ hsdev_daily <- function(train_data, test_data, n, 
num_historic_periods, interval
   granularity <- train_data[2,1] - train_data[1,1]
   test_start <- test_data[1,1]
   test_end <- test_data[length(test_data[1,]),1]
-  cat ("\n test_start : ", as.numeric(test_start))
   train_start <- test_start - num_historic_periods*period
-  cat ("\n train_start : ", as.numeric(train_start))
   # round to start of day
   train_start <- train_start - (train_start %% interval)
-  cat ("\n train_start after rounding: ", as.numeric(train_start))
 
   time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" 
,tz = "GMT")
   test_data_day <- time$wday
@@ -39,7 +36,6 @@ hsdev_daily <- function(train_data, test_data, n, 
num_historic_periods, interval
   for ( i in length(train_data[,1]):1) {
     ts <- train_data[i,1]
     if ( ts < train_start) {
-      cat ("\n Breaking out of loop : ", ts)
       break
     }
     time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
@@ -49,20 +45,14 @@ hsdev_daily <- function(train_data, test_data, n, 
num_historic_periods, interval
     }
   }
 
-  cat ("\n Train data length : ", length(train_data[,1]))
-  cat ("\n Test data length : ", length(test_data[,1]))
-  cat ("\n Historic data length : ", length(h_data))
   if (length(h_data) < 2*length(test_data[,1])) {
     cat ("\nNot enough training data")
     return (anomalies)
   }
 
   past_median <- median(h_data)
-  cat ("\npast_median : ", past_median)
   past_sd <- sd(h_data)
-  cat ("\npast_sd : ", past_sd)
   curr_median <- median(test_data[,2])
-  cat ("\ncurr_median : ", curr_median)
 
   if (abs(curr_median - past_median) > n * past_sd) {
     anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
@@ -70,7 +60,7 @@ hsdev_daily <- function(train_data, test_data, n, 
num_historic_periods, interval
   }
 
   if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past 
Median", " Past SD")
+    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past 
Median", "Past SD")
   }
 
   return (anomalies)

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
index b4dfdcb..f22bc15 100644
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
@@ -24,7 +24,7 @@ ams_ks <- function(train_data, test_data, p_value) {
 #  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
   
   anomalies <- data.frame()
-  res <- ks.test(train_data, test_data[,2])
+  res <- ks.test(train_data[,2], test_data[,2])
   
   if (res[2] < p_value) {
     anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], 
res[2])

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
index 7fffbdd..f33b6ec 100644
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
@@ -32,12 +32,17 @@ ams_tukeys <- function(train_data, test_data, n) {
     lb <- quantiles[2] - n*iqr
     ub <- quantiles[4] + n*iqr
     if ( (x < lb)  || (x > ub) ) {
-      anomaly <- c(test_data[i,1], x)
+      if (x < lb) {
+        niqr <- (quantiles[2] - x) / iqr
+      } else {
+        niqr <- (x - quantiles[4]) / iqr
+      }
+      anomaly <- c(test_data[i,1], x, niqr)
       anomalies <- rbind(anomalies, anomaly)
     }
   }
   if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS", "Value")
+    names(anomalies) <- c("TS", "Value", "niqr")
   }
   return (anomalies)
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
deleted file mode 100644
index 3827006..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#url_prefix = 
'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?'
-#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES'
-#data_url <- paste(url_prefix, query, sep ="")
-#data_url <- paste(data_url, url_suffix, sep="")
-
-get_data <- function(url) {
-  library(rjson)
-  res <- fromJSON(readLines(url)[1])
-  return (res)
-}
-
-find_index <- function(data, ts) {
-  for (i in 1:length(data)) {
-    if (as.numeric(ts) == as.numeric(data[i])) {
-      return (i)
-    }
-  }
-  return (-1)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
new file mode 100644
index 0000000..539ca40
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.TreeMap;
+
+public class TestEmaTechnique {
+
+  @Test
+  public void testEmaInitialization() {
+
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+    Assert.assertTrue(ema.getTrackedEmas().isEmpty());
+    Assert.assertTrue(ema.getStartingWeight() == 0.5);
+    Assert.assertTrue(ema.getStartTimesSdev() == 3);
+  }
+
+  @Test
+  public void testEma() {
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+
+    long now = System.currentTimeMillis();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("M1");
+    metric1.setHostName("H1");
+    metric1.setStartTime(now - 1000);
+    metric1.setAppId("A1");
+    metric1.setInstanceId(null);
+    metric1.setType("Integer");
+
+    //Train
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    List<MetricAnomaly> anomalyList = ema.test(metric1);
+//    Assert.assertTrue(anomalyList.isEmpty());
+
+    metricValues = new TreeMap<Long, Double>();
+    for (int i = 0; i < 50; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 100, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    anomalyList = ema.test(metric1);
+    Assert.assertTrue(!anomalyList.isEmpty());
+    int l1 = anomalyList.size();
+
+    Assert.assertTrue(ema.updateModel(metric1, false, 20));
+    anomalyList = ema.test(metric1);
+    int l2 = anomalyList.size();
+    Assert.assertTrue(l2 < l1);
+
+    Assert.assertTrue(ema.updateModel(metric1, true, 50));
+    anomalyList = ema.test(metric1);
+    int l3 = anomalyList.size();
+    Assert.assertTrue(l3 > l2 && l3 > l1);
+
+  }
+}

Reply via email to