http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java new file mode 100644 index 0000000..ff8dbcf --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.prototype.methods.kstest; + +import org.apache.ambari.metrics.alertservice.prototype.RFunctionInvoker; +import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class KSTechnique implements Serializable { + + private String methodType = "ks"; + private Map<String, Double> pValueMap; + private static final Log LOG = LogFactory.getLog(KSTechnique.class); + + public KSTechnique() { + pValueMap = new HashMap(); + } + + public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) { + + int testLength = testData.values.length; + int trainLength = trainData.values.length; + + if (trainLength < testLength) { + LOG.info("Not enough train data."); + return null; + } + + if (!pValueMap.containsKey(key)) { + pValueMap.put(key, 0.05); + } + double pValue = pValueMap.get(key); + + ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue))); + if (result == null) { + LOG.error("Resultset is null when invoking KS R function..."); + return null; + } + + if (result.resultset.size() > 0) { + + LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length); + LOG.info("p_value = " + result.resultset.get(3)[0]); + double dValue = result.resultset.get(2)[0]; + + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + dValue); + } + + return null; + } + + public void updateModel(String metricKey, boolean increaseSensitivity, double percent) { + + LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); + + if (!pValueMap.containsKey(metricKey)) { + LOG.error("Unknown metric key : " + metricKey); + LOG.info("pValueMap :" + pValueMap.toString()); + return; + } + + double delta = percent / 100; + if (!increaseSensitivity) { + delta = delta * -1; + } + + double pValue = pValueMap.get(metricKey); + double newPValue = Math.min(1.0, pValue + delta * pValue); + pValueMap.put(metricKey, newPValue); + LOG.info("New pValue = " + newPValue); + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java new file mode 100644 index 0000000..a8e31bf --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +public interface AbstractMetricSeries { + + public double nextValue(); + public double[] getSeries(int n); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java new file mode 100644 index 0000000..4158ff4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class DualBandMetricSeries implements AbstractMetricSeries { + + double lowBandValue = 0.0; + double lowBandDeviationPercentage = 0.0; + int lowBandPeriodSize = 10; + double highBandValue = 1.0; + double highBandDeviationPercentage = 0.0; + int highBandPeriodSize = 10; + + Random random = new Random(); + double lowBandValueLowerLimit, lowBandValueHigherLimit; + double highBandLowerLimit, highBandUpperLimit; + int l = 0, h = 0; + + public DualBandMetricSeries(double lowBandValue, + double lowBandDeviationPercentage, + int lowBandPeriodSize, + double highBandValue, + double highBandDeviationPercentage, + int highBandPeriodSize) { + this.lowBandValue = lowBandValue; + this.lowBandDeviationPercentage = lowBandDeviationPercentage; + this.lowBandPeriodSize = lowBandPeriodSize; + this.highBandValue = highBandValue; + this.highBandDeviationPercentage = highBandDeviationPercentage; + this.highBandPeriodSize = highBandPeriodSize; + init(); + } + + private void init() { + lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue; + lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue; + highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue; + highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue; + } + + @Override + public double nextValue() { + + double value = 0.0; + + if (l < lowBandPeriodSize) { + value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble(); + l++; + } else if (h < highBandPeriodSize) { + value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble(); + h++; + } + + if (l == lowBandPeriodSize && h == highBandPeriodSize) { + l = 0; + h = 0; + } + + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java new file mode 100644 index 0000000..1e37ff3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Arrays; +import java.util.Map; +import java.util.Random; + +public class MetricSeriesGeneratorFactory { + + /** + * Return a normally distributed data series with some deviation % and outliers. + * + * @param n size of the data series + * @param value The value around which the uniform data series is centered on. + * @param deviationPercentage The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1. + * @param outlierProbability The probability of finding an outlier in the series. + * @param outlierDeviationLowerPercentage min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13. + * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16. + * @param outliersAboveValue Outlier should be greater or smaller than the value. + * @return uniform series + */ + public static double[] createUniformSeries(int n, + double value, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + + UniformMetricSeries metricSeries = new UniformMetricSeries(value, + deviationPercentage, + outlierProbability, + outlierDeviationLowerPercentage, + outlierDeviationHigherPercentage, + outliersAboveValue); + + return metricSeries.getSeries(n); + } + + + /** + * /** + * Returns a normally distributed series. + * + * @param n size of the data series + * @param mean mean of the distribution + * @param sd sd of the distribution + * @param outlierProbability sd of the distribution + * @param outlierDeviationSDTimesLower Lower Limit of the outlier with respect to times sdev from the mean. + * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean. + * @param outlierOnRightEnd Outlier should be on the right end or the left end. + * @return normal series + */ + public static double[] createNormalSeries(int n, + double mean, + double sd, + double outlierProbability, + double outlierDeviationSDTimesLower, + double outlierDeviationSDTimesHigher, + boolean outlierOnRightEnd) { + + + NormalMetricSeries metricSeries = new NormalMetricSeries(mean, + sd, + outlierProbability, + outlierDeviationSDTimesLower, + outlierDeviationSDTimesHigher, + outlierOnRightEnd); + + return metricSeries.getSeries(n); + } + + + /** + * Returns a monotonically increasing / decreasing series + * + * @param n size of the data series + * @param startValue Start value of the monotonic sequence + * @param slope direction of monotonicity m > 0 for increasing and m < 0 for decreasing. + * @param deviationPercentage The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1. + * @param outlierProbability The probability of finding an outlier in the series. + * @param outlierDeviationLowerPercentage min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13. + * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16. + * @param outliersAboveValue Outlier should be greater or smaller than the 'y' value. + * @return + */ + public static double[] createMonotonicSeries(int n, + double startValue, + double slope, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + + MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue, + slope, + deviationPercentage, + outlierProbability, + outlierDeviationLowerPercentage, + outlierDeviationHigherPercentage, + outliersAboveValue); + + return metricSeries.getSeries(n); + } + + + /** + * Returns a dual band series (lower and higher) + * + * @param n size of the data series + * @param lowBandValue lower band value + * @param lowBandDeviationPercentage lower band deviation + * @param lowBandPeriodSize lower band + * @param highBandValue high band centre value + * @param highBandDeviationPercentage high band deviation. + * @param highBandPeriodSize high band size + * @return + */ + public static double[] getDualBandSeries(int n, + double lowBandValue, + double lowBandDeviationPercentage, + int lowBandPeriodSize, + double highBandValue, + double highBandDeviationPercentage, + int highBandPeriodSize) { + + DualBandMetricSeries metricSeries = new DualBandMetricSeries(lowBandValue, + lowBandDeviationPercentage, + lowBandPeriodSize, + highBandValue, + highBandDeviationPercentage, + highBandPeriodSize); + + return metricSeries.getSeries(n); + } + + /** + * Returns a step function series. + * + * @param n size of the data series + * @param startValue start steady value + * @param steadyValueDeviationPercentage required devation in the steady state value + * @param steadyPeriodSlope direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease. + * @param steadyPeriodMinSize min size for step period + * @param steadyPeriodMaxSize max size for step period. + * @param stepChangePercentage Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value. + * @param upwardStep upward or downward step. + * @return + */ + public static double[] getStepFunctionSeries(int n, + double startValue, + double steadyValueDeviationPercentage, + double steadyPeriodSlope, + int steadyPeriodMinSize, + int steadyPeriodMaxSize, + double stepChangePercentage, + boolean upwardStep) { + + StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue, + steadyValueDeviationPercentage, + steadyPeriodSlope, + steadyPeriodMinSize, + steadyPeriodMaxSize, + stepChangePercentage, + upwardStep); + + return metricSeries.getSeries(n); + } + + /** + * Series with small period of turbulence and then back to steady. + * + * @param n size of the data series + * @param steadyStateValue steady state center value + * @param steadyStateDeviationPercentage steady state deviation in percentage + * @param turbulentPeriodDeviationLowerPercentage turbulent state lower limit in terms of percentage from centre value. + * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value. + * @param turbulentPeriodLength turbulent period length (number of points) + * @param turbulentStatePosition Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series. + * @return + */ + public static double[] getSteadySeriesWithTurbulentPeriod(int n, + double steadyStateValue, + double steadyStateDeviationPercentage, + double turbulentPeriodDeviationLowerPercentage, + double turbulentPeriodDeviationHigherPercentage, + int turbulentPeriodLength, + int turbulentStatePosition + ) { + + + SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n, + steadyStateValue, + steadyStateDeviationPercentage, + turbulentPeriodDeviationLowerPercentage, + turbulentPeriodDeviationHigherPercentage, + turbulentPeriodLength, + turbulentStatePosition); + + return metricSeries.getSeries(n); + } + + + public static double[] generateSeries(String type, int n, Map<String, String> configs) { + + double[] series; + switch (type) { + + case "normal": + series = createNormalSeries(n, + Double.parseDouble(configs.getOrDefault("mean", "0")), + Double.parseDouble(configs.getOrDefault("sd", "1")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")), + Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true"))); + break; + + case "uniform": + series = createUniformSeries(n, + Double.parseDouble(configs.getOrDefault("value", "10")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "monotonic": + series = createMonotonicSeries(n, + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("slope", "0")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "dualband": + series = getDualBandSeries(n, + Double.parseDouble(configs.getOrDefault("lowBandValue", "10")), + Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")), + Double.parseDouble(configs.getOrDefault("highBandValue", "10")), + Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0"))); + break; + + case "step": + series = getStepFunctionSeries(n, + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")), + Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true"))); + break; + + case "turbulence": + series = getSteadySeriesWithTurbulentPeriod(n, + Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")), + Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")), + Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0"))); + break; + + default: + series = createNormalSeries(n, + 0, + 1, + 0, + 0, + 0, + true); + } + return series; + } + + public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) { + + AbstractMetricSeries series; + switch (type) { + + case "normal": + series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")), + Double.parseDouble(configs.getOrDefault("sd", "1")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")), + Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true"))); + break; + + case "uniform": + series = new UniformMetricSeries( + Double.parseDouble(configs.getOrDefault("value", "10")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "monotonic": + series = new MonotonicMetricSeries( + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("slope", "0")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "dualband": + series = new DualBandMetricSeries( + Double.parseDouble(configs.getOrDefault("lowBandValue", "10")), + Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")), + Double.parseDouble(configs.getOrDefault("highBandValue", "10")), + Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0"))); + break; + + case "step": + series = new StepFunctionMetricSeries( + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")), + Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true"))); + break; + + case "turbulence": + series = new SteadyWithTurbulenceMetricSeries( + Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")), + Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")), + Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")), + Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0"))); + break; + + default: + series = new NormalMetricSeries(0, + 1, + 0, + 0, + 0, + true); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java new file mode 100644 index 0000000..a883d08 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class MonotonicMetricSeries implements AbstractMetricSeries { + + double startValue = 0.0; + double slope = 0.5; + double deviationPercentage = 0.0; + double outlierProbability = 0.0; + double outlierDeviationLowerPercentage = 0.0; + double outlierDeviationHigherPercentage = 0.0; + boolean outliersAboveValue = true; + + Random random = new Random(); + double nonOutlierProbability; + + // y = mx + c + double y; + double m; + double x; + double c; + + public MonotonicMetricSeries(double startValue, + double slope, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + this.startValue = startValue; + this.slope = slope; + this.deviationPercentage = deviationPercentage; + this.outlierProbability = outlierProbability; + this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; + this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; + this.outliersAboveValue = outliersAboveValue; + init(); + } + + private void init() { + y = startValue; + m = slope; + x = 1; + c = y - (m * x); + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + y = m * x + c; + if (probability <= nonOutlierProbability) { + double valueDeviationLowerLimit = y - deviationPercentage * y; + double valueDeviationHigherLimit = y + deviationPercentage * y; + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } else { + if (outliersAboveValue) { + double outlierLowerLimit = y + outlierDeviationLowerPercentage * y; + double outlierUpperLimit = y + outlierDeviationHigherPercentage * y; + value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble(); + } else { + double outlierLowerLimit = y - outlierDeviationLowerPercentage * y; + double outlierUpperLimit = y - outlierDeviationHigherPercentage * y; + value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble(); + } + } + x++; + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java new file mode 100644 index 0000000..cc83d2c --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class NormalMetricSeries implements AbstractMetricSeries { + + double mean = 0.0; + double sd = 1.0; + double outlierProbability = 0.0; + double outlierDeviationSDTimesLower = 0.0; + double outlierDeviationSDTimesHigher = 0.0; + boolean outlierOnRightEnd = true; + + Random random = new Random(); + double nonOutlierProbability; + + + public NormalMetricSeries(double mean, + double sd, + double outlierProbability, + double outlierDeviationSDTimesLower, + double outlierDeviationSDTimesHigher, + boolean outlierOnRightEnd) { + this.mean = mean; + this.sd = sd; + this.outlierProbability = outlierProbability; + this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower; + this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher; + this.outlierOnRightEnd = outlierOnRightEnd; + init(); + } + + private void init() { + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + if (probability <= nonOutlierProbability) { + value = random.nextGaussian() * sd + mean; + } else { + if (outlierOnRightEnd) { + value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd; + } else { + value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd; + } + } + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java new file mode 100644 index 0000000..c4ed3ba --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries { + + double steadyStateValue = 0.0; + double steadyStateDeviationPercentage = 0.0; + double turbulentPeriodDeviationLowerPercentage = 0.3; + double turbulentPeriodDeviationHigherPercentage = 0.5; + int turbulentPeriodLength = 5; + int turbulentStatePosition = 1; + int approximateSeriesLength = 10; + + Random random = new Random(); + double valueDeviationLowerLimit; + double valueDeviationHigherLimit; + double tPeriodLowerLimit; + double tPeriodUpperLimit; + int tPeriodStartIndex = 0; + int index = 0; + + public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength, + double steadyStateValue, + double steadyStateDeviationPercentage, + double turbulentPeriodDeviationLowerPercentage, + double turbulentPeriodDeviationHigherPercentage, + int turbulentPeriodLength, + int turbulentStatePosition) { + this.approximateSeriesLength = approximateSeriesLength; + this.steadyStateValue = steadyStateValue; + this.steadyStateDeviationPercentage = steadyStateDeviationPercentage; + this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage; + this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage; + this.turbulentPeriodLength = turbulentPeriodLength; + this.turbulentStatePosition = turbulentStatePosition; + init(); + } + + private void init() { + + if (turbulentStatePosition == 1) { + tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble())); + } else if (turbulentStatePosition == 2) { + tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength; + } + + valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue; + valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue; + + tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue; + tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue; + } + + @Override + public double nextValue() { + + double value; + + if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) { + value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble(); + } else { + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } + index++; + return value; + } + + @Override + public double[] getSeries(int n) { + + double[] series = new double[n]; + int turbulentPeriodStartIndex = 0; + + if (turbulentStatePosition == 1) { + turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble())); + } else if (turbulentStatePosition == 2) { + turbulentPeriodStartIndex = n - turbulentPeriodLength; + } + + double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue; + double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue; + + double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue; + double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue; + + for (int i = 0; i < n; i++) { + if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) { + series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble(); + } else { + series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble(); + } + } + + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java new file mode 100644 index 0000000..d5beb48 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class StepFunctionMetricSeries implements AbstractMetricSeries { + + double startValue = 0.0; + double steadyValueDeviationPercentage = 0.0; + double steadyPeriodSlope = 0.5; + int steadyPeriodMinSize = 10; + int steadyPeriodMaxSize = 20; + double stepChangePercentage = 0.0; + boolean upwardStep = true; + + Random random = new Random(); + + // y = mx + c + double y; + double m; + double x; + double c; + int currentStepSize; + int currentIndex; + + public StepFunctionMetricSeries(double startValue, + double steadyValueDeviationPercentage, + double steadyPeriodSlope, + int steadyPeriodMinSize, + int steadyPeriodMaxSize, + double stepChangePercentage, + boolean upwardStep) { + this.startValue = startValue; + this.steadyValueDeviationPercentage = steadyValueDeviationPercentage; + this.steadyPeriodSlope = steadyPeriodSlope; + this.steadyPeriodMinSize = steadyPeriodMinSize; + this.steadyPeriodMaxSize = steadyPeriodMaxSize; + this.stepChangePercentage = stepChangePercentage; + this.upwardStep = upwardStep; + init(); + } + + private void init() { + y = startValue; + m = steadyPeriodSlope; + x = 1; + c = y - (m * x); + + currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); + currentIndex = 0; + } + + @Override + public double nextValue() { + + double value = 0.0; + + if (currentIndex < currentStepSize) { + y = m * x + c; + double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y; + double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y; + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + x++; + currentIndex++; + } + + if (currentIndex == currentStepSize) { + currentIndex = 0; + currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); + if (upwardStep) { + y = y + stepChangePercentage * y; + } else { + y = y - stepChangePercentage * y; + } + x = 1; + c = y - (m * x); + } + + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java new file mode 100644 index 0000000..a2b0eea --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class UniformMetricSeries implements AbstractMetricSeries { + + double value = 0.0; + double deviationPercentage = 0.0; + double outlierProbability = 0.0; + double outlierDeviationLowerPercentage = 0.0; + double outlierDeviationHigherPercentage = 0.0; + boolean outliersAboveValue= true; + + Random random = new Random(); + double valueDeviationLowerLimit; + double valueDeviationHigherLimit; + double outlierLeftLowerLimit; + double outlierLeftHigherLimit; + double outlierRightLowerLimit; + double outlierRightUpperLimit; + double nonOutlierProbability; + + + public UniformMetricSeries(double value, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + this.value = value; + this.deviationPercentage = deviationPercentage; + this.outlierProbability = outlierProbability; + this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; + this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; + this.outliersAboveValue = outliersAboveValue; + init(); + } + + private void init() { + valueDeviationLowerLimit = value - deviationPercentage * value; + valueDeviationHigherLimit = value + deviationPercentage * value; + + outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value; + outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value; + outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value; + outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value; + + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + if (probability <= nonOutlierProbability) { + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } else { + if (!outliersAboveValue) { + value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble(); + } else { + value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble(); + } + } + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java deleted file mode 100644 index daaee5c..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AmsKafkaProducer.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.spark; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.ambari.metrics.alertservice.common.TimelineMetric; -import org.apache.ambari.metrics.alertservice.common.TimelineMetrics; -import org.apache.kafka.clients.producer.*; - -import java.util.Properties; -import java.util.TreeMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -public class AmsKafkaProducer { - - Producer producer; - private static String topicName = "ambari-metrics-topic"; - - public AmsKafkaProducer(String kafkaServers) { - Properties configProperties = new Properties(); - configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667" - configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); - configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); - producer = new KafkaProducer(configProperties); - } - - public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException { - - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics); - ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode); - Future<RecordMetadata> kafkaFuture = producer.send(rec); - - System.out.println(kafkaFuture.isDone()); - System.out.println(kafkaFuture.get().topic()); - } - - public static void main(String[] args) throws ExecutionException, InterruptedException { - final long now = System.currentTimeMillis(); - - TimelineMetrics timelineMetrics = new TimelineMetrics(); - TimelineMetric metric1 = new TimelineMetric(); - metric1.setMetricName("mem_free"); - metric1.setHostName("avijayan-ams-3.openstacklocal"); - metric1.setTimestamp(now); - metric1.setStartTime(now - 1000); - metric1.setAppId("HOST"); - metric1.setType("Integer"); - - TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); - - for (int i = 0; i<20;i++) { - double metric = 20000 + Math.random(); - metricValues.put(now - i*100, metric); - } - - metric1.setMetricValues(metricValues); - -// metric1.setMetricValues(new TreeMap<Long, Double>() {{ -// put(now - 100, 1.20); -// put(now - 200, 11.25); -// put(now - 300, 1.30); -// put(now - 400, 4.50); -// put(now - 500, 16.35); -// put(now - 400, 5.50); -// }}); - - timelineMetrics.getMetrics().add(metric1); - - for (int i = 0; i<1; i++) { - new AmsKafkaProducer("avijayan-ams-2.openstacklocal:6667").sendMetrics(timelineMetrics); - Thread.sleep(1000); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java deleted file mode 100644 index d65790e..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/AnomalyMetricPublisher.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.spark; - -import org.apache.ambari.metrics.alertservice.common.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.common.TimelineMetric; -import org.apache.ambari.metrics.alertservice.common.TimelineMetrics; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.codehaus.jackson.map.AnnotationIntrospector; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.*; - -public class AnomalyMetricPublisher implements Serializable { - - private String hostName = "UNKNOWN.example.com"; - private String instanceId = null; - private String serviceName = "anomaly-engine"; - private String collectorHost; - private String protocol; - private String port; - private static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; - private static final Log LOG = LogFactory.getLog(AnomalyMetricPublisher.class); - private static ObjectMapper mapper; - - static { - mapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); - mapper.setAnnotationIntrospector(introspector); - mapper.getSerializationConfig() - .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); - } - - public AnomalyMetricPublisher(String collectorHost, String protocol, String port) { - this.collectorHost = collectorHost; - this.protocol = protocol; - this.port = port; - this.hostName = getDefaultLocalHostName(); - } - - private String getDefaultLocalHostName() { - try { - return InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - LOG.info("Error getting host address"); - } - return null; - } - - public void publish(List<MetricAnomaly> metricAnomalies) { - LOG.info("Sending metric anomalies of size : " + metricAnomalies.size()); - List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies); - if (!metricList.isEmpty()) { - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(metricList); - emitMetrics(timelineMetrics); - } - } - - private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> metricAnomalies) { - List<TimelineMetric> metrics = new ArrayList<>(); - - if (metricAnomalies.isEmpty()) { - return metrics; - } - - long currentTime = System.currentTimeMillis(); - MetricAnomaly prevAnomaly = metricAnomalies.get(0); - - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(prevAnomaly.getMetricKey() + "_" + prevAnomaly.getMethodResult().getMethodType()); - timelineMetric.setAppId(serviceName); - timelineMetric.setInstanceId(instanceId); - timelineMetric.setHostName(hostName); - timelineMetric.setStartTime(currentTime); - - TreeMap<Long,Double> metricValues = new TreeMap<>(); - metricValues.put(prevAnomaly.getTimestamp(), prevAnomaly.getMetricValue()); - MetricAnomaly currentAnomaly; - - for (int i = 1; i < metricAnomalies.size(); i++) { - currentAnomaly = metricAnomalies.get(i); - if (currentAnomaly.getMetricKey().equals(prevAnomaly.getMetricKey())) { - metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue()); - } else { - timelineMetric.setMetricValues(metricValues); - metrics.add(timelineMetric); - - timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(currentAnomaly.getMetricKey() + "_" + currentAnomaly.getMethodResult().getMethodType()); - timelineMetric.setAppId(serviceName); - timelineMetric.setInstanceId(instanceId); - timelineMetric.setHostName(hostName); - timelineMetric.setStartTime(currentTime); - metricValues = new TreeMap<>(); - metricValues.put(currentAnomaly.getTimestamp(), currentAnomaly.getMetricValue()); - prevAnomaly = currentAnomaly; - } - } - - timelineMetric.setMetricValues(metricValues); - metrics.add(timelineMetric); - return metrics; - } - - private boolean emitMetrics(TimelineMetrics metrics) { - String connectUrl = constructTimelineMetricUri(); - String jsonData = null; - LOG.info("EmitMetrics connectUrl = " + connectUrl); - try { - jsonData = mapper.writeValueAsString(metrics); - } catch (IOException e) { - LOG.error("Unable to parse metrics", e); - } - if (jsonData != null) { - return emitMetricsJson(connectUrl, jsonData); - } - return false; - } - - private HttpURLConnection getConnection(String spec) throws IOException { - return (HttpURLConnection) new URL(spec).openConnection(); - } - - private boolean emitMetricsJson(String connectUrl, String jsonData) { - int timeout = 10000; - HttpURLConnection connection = null; - try { - if (connectUrl == null) { - throw new IOException("Unknown URL. Unable to connect to metrics collector."); - } - connection = getConnection(connectUrl); - - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Connection", "Keep-Alive"); - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - connection.setDoOutput(true); - - if (jsonData != null) { - try (OutputStream os = connection.getOutputStream()) { - os.write(jsonData.getBytes("UTF-8")); - } - } - - int statusCode = connection.getResponseCode(); - - if (statusCode != 200) { - LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + - "statusCode = " + statusCode); - } else { - LOG.info("Metrics posted to Collector " + connectUrl); - } - return true; - } catch (IOException ioe) { - LOG.error(ioe.getMessage()); - } - return false; - } - - private String constructTimelineMetricUri() { - StringBuilder sb = new StringBuilder(protocol); - sb.append("://"); - sb.append(collectorHost); - sb.append(":"); - sb.append(port); - sb.append(WS_V1_TIMELINE_METRICS); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java deleted file mode 100644 index 3989c67..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/spark/MetricAnomalyDetector.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.spark; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.ambari.metrics.alertservice.common.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.common.TimelineMetric; -import org.apache.ambari.metrics.alertservice.common.TimelineMetrics; -import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel; -import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel; -import org.apache.ambari.metrics.alertservice.methods.ema.EmaModelLoader; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.kafka.KafkaUtils; -import scala.Tuple2; - -import java.util.*; - -public class MetricAnomalyDetector { - - private static final Log LOG = LogFactory.getLog(MetricAnomalyDetector.class); - private static String groupId = "ambari-metrics-group"; - private static String topicName = "ambari-metrics-topic"; - private static int numThreads = 1; - - //private static String zkQuorum = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181"; - //private static Map<String, String> kafkaParams = new HashMap<>(); - //static { - // kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "avijayan-ams-2.openstacklocal:6667"); - // kafkaParams.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - // kafkaParams.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonSerializer"); - // kafkaParams.put("metadata.broker.list", "avijayan-ams-2.openstacklocal:6667"); - //} - - public MetricAnomalyDetector() { - } - - public static void main(String[] args) throws InterruptedException { - - - if (args.length < 6) { - System.err.println("Usage: MetricAnomalyDetector <method1,method2> <appid1,appid2> <collector_host> <port> <protocol> <zkQuorum>"); - System.exit(1); - } - - List<String> appIds = Arrays.asList(args[1].split(",")); - String collectorHost = args[2]; - String collectorPort = args[3]; - String collectorProtocol = args[4]; - String zkQuorum = args[5]; - - List<MetricAnomalyModel> anomalyDetectionModels = new ArrayList<>(); - AnomalyMetricPublisher anomalyMetricPublisher = new AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort); - - SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); - - JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); - - for (String method : args[0].split(",")) { - if (method.equals("ema")) { - LOG.info("Model EMA requested."); - EmaModel emaModel = new EmaModelLoader().load(jssc.sparkContext().sc(), "/tmp/model/ema"); - anomalyDetectionModels.add(emaModel); - } - } - - JavaPairReceiverInputDStream<String, String> messages = - KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); - - //Convert JSON string to TimelineMetrics. - JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { - @Override - public TimelineMetrics call(Tuple2<String, String> message) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); - return metrics; - } - }); - - //Group TimelineMetric by AppId. - JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( - timelineMetrics -> new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(),timelineMetrics) - ); - - appMetricStream.print(); - - //Filter AppIds that are not needed. - JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { - @Override - public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { - return appIds.contains(appMetricTuple._1); - } - }); - - filteredAppMetricStream.print(); - - filteredAppMetricStream.foreachRDD(rdd -> { - rdd.foreach( - tuple2 -> { - TimelineMetrics metrics = tuple2._2(); - for (TimelineMetric metric : metrics.getMetrics()) { - - TimelineMetric timelineMetric = - new TimelineMetric(metric.getMetricName(), metric.getAppId(), metric.getHostName(), metric.getMetricValues()); - - for (MetricAnomalyModel model : anomalyDetectionModels) { - List<MetricAnomaly> anomalies = model.test(timelineMetric); - anomalyMetricPublisher.publish(anomalies); - for (MetricAnomaly anomaly : anomalies) { - LOG.info(anomaly.getAnomalyAsString()); - } - - } - } - }); - }); - - jssc.start(); - jssc.awaitTermination(); - } -} - - - - http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r index b25e79d..bca3366 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r @@ -25,12 +25,9 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval granularity <- train_data[2,1] - train_data[1,1] test_start <- test_data[1,1] test_end <- test_data[length(test_data[1,]),1] - cat ("\n test_start : ", as.numeric(test_start)) train_start <- test_start - num_historic_periods*period - cat ("\n train_start : ", as.numeric(train_start)) # round to start of day train_start <- train_start - (train_start %% interval) - cat ("\n train_start after rounding: ", as.numeric(train_start)) time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT") test_data_day <- time$wday @@ -39,7 +36,6 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval for ( i in length(train_data[,1]):1) { ts <- train_data[i,1] if ( ts < train_start) { - cat ("\n Breaking out of loop : ", ts) break } time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT") @@ -49,20 +45,14 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval } } - cat ("\n Train data length : ", length(train_data[,1])) - cat ("\n Test data length : ", length(test_data[,1])) - cat ("\n Historic data length : ", length(h_data)) if (length(h_data) < 2*length(test_data[,1])) { cat ("\nNot enough training data") return (anomalies) } past_median <- median(h_data) - cat ("\npast_median : ", past_median) past_sd <- sd(h_data) - cat ("\npast_sd : ", past_sd) curr_median <- median(test_data[,2]) - cat ("\ncurr_median : ", curr_median) if (abs(curr_median - past_median) > n * past_sd) { anomaly <- c(test_start, test_end, curr_median, past_median, past_sd) @@ -70,7 +60,7 @@ hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval } if(length(anomalies) > 0) { - names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", " Past SD") + names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD") } return (anomalies) http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r index b4dfdcb..f22bc15 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r @@ -24,7 +24,7 @@ ams_ks <- function(train_data, test_data, p_value) { # test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2] anomalies <- data.frame() - res <- ks.test(train_data, test_data[,2]) + res <- ks.test(train_data[,2], test_data[,2]) if (res[2] < p_value) { anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2]) http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r index 7fffbdd..f33b6ec 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r @@ -32,12 +32,17 @@ ams_tukeys <- function(train_data, test_data, n) { lb <- quantiles[2] - n*iqr ub <- quantiles[4] + n*iqr if ( (x < lb) || (x > ub) ) { - anomaly <- c(test_data[i,1], x) + if (x < lb) { + niqr <- (quantiles[2] - x) / iqr + } else { + niqr <- (x - quantiles[4]) / iqr + } + anomaly <- c(test_data[i,1], x, niqr) anomalies <- rbind(anomalies, anomaly) } } if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value") + names(anomalies) <- c("TS", "Value", "niqr") } return (anomalies) } http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R deleted file mode 100644 index 3827006..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#url_prefix = 'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?' -#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES' -#data_url <- paste(url_prefix, query, sep ="") -#data_url <- paste(data_url, url_suffix, sep="") - -get_data <- function(url) { - library(rjson) - res <- fromJSON(readLines(url)[1]) - return (res) -} - -find_index <- function(data, ts) { - for (i in 1:length(data)) { - if (as.numeric(ts) == as.numeric(data[i])) { - return (i) - } - } - return (-1) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java new file mode 100644 index 0000000..539ca40 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.TreeMap; + +public class TestEmaTechnique { + + @Test + public void testEmaInitialization() { + + EmaTechnique ema = new EmaTechnique(0.5, 3); + Assert.assertTrue(ema.getTrackedEmas().isEmpty()); + Assert.assertTrue(ema.getStartingWeight() == 0.5); + Assert.assertTrue(ema.getStartTimesSdev() == 3); + } + + @Test + public void testEma() { + EmaTechnique ema = new EmaTechnique(0.5, 3); + + long now = System.currentTimeMillis(); + + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("M1"); + metric1.setHostName("H1"); + metric1.setStartTime(now - 1000); + metric1.setAppId("A1"); + metric1.setInstanceId(null); + metric1.setType("Integer"); + + //Train + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + for (int i = 0; i < 50; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 100, metric); + } + metric1.setMetricValues(metricValues); + List<MetricAnomaly> anomalyList = ema.test(metric1); +// Assert.assertTrue(anomalyList.isEmpty()); + + metricValues = new TreeMap<Long, Double>(); + for (int i = 0; i < 50; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 100, metric); + } + metric1.setMetricValues(metricValues); + anomalyList = ema.test(metric1); + Assert.assertTrue(!anomalyList.isEmpty()); + int l1 = anomalyList.size(); + + Assert.assertTrue(ema.updateModel(metric1, false, 20)); + anomalyList = ema.test(metric1); + int l2 = anomalyList.size(); + Assert.assertTrue(l2 < l1); + + Assert.assertTrue(ema.updateModel(metric1, true, 50)); + anomalyList = ema.test(metric1); + int l3 = anomalyList.size(); + Assert.assertTrue(l3 > l2 && l3 > l1); + + } +}
