http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java new file mode 100644 index 0000000..a424f8e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/TestSeriesInputRequest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.testing.utilities; + +import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Collections; +import java.util.Map; + +@XmlRootElement +public class TestSeriesInputRequest { + + private String seriesName; + private String seriesType; + private Map<String, String> configs; + + public TestSeriesInputRequest() { + } + + public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) { + this.seriesName = seriesName; + this.seriesType = seriesType; + this.configs = configs; + } + + public String getSeriesName() { + return seriesName; + } + + public void setSeriesName(String seriesName) { + this.seriesName = seriesName; + } + + public String getSeriesType() { + return seriesType; + } + + public void setSeriesType(String seriesType) { + this.seriesType = seriesType; + } + + public Map<String, String> getConfigs() { + return configs; + } + + public void setConfigs(Map<String, String> configs) { + this.configs = configs; + } + + @Override + public boolean equals(Object o) { + TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o; + return anotherInput.getSeriesName().equals(this.getSeriesName()); + } + + @Override + public int hashCode() { + return seriesName.hashCode(); + } + + public static void main(String[] args) { + + ObjectMapper objectMapper = new ObjectMapper(); + TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value")); + try { + System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java new file mode 100644 index 0000000..a8e31bf --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/AbstractMetricSeries.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +public interface AbstractMetricSeries { + + public double nextValue(); + public double[] getSeries(int n); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java new file mode 100644 index 0000000..4158ff4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/DualBandMetricSeries.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class DualBandMetricSeries implements AbstractMetricSeries { + + double lowBandValue = 0.0; + double lowBandDeviationPercentage = 0.0; + int lowBandPeriodSize = 10; + double highBandValue = 1.0; + double highBandDeviationPercentage = 0.0; + int highBandPeriodSize = 10; + + Random random = new Random(); + double lowBandValueLowerLimit, lowBandValueHigherLimit; + double highBandLowerLimit, highBandUpperLimit; + int l = 0, h = 0; + + public DualBandMetricSeries(double lowBandValue, + double lowBandDeviationPercentage, + int lowBandPeriodSize, + double highBandValue, + double highBandDeviationPercentage, + int highBandPeriodSize) { + this.lowBandValue = lowBandValue; + this.lowBandDeviationPercentage = lowBandDeviationPercentage; + this.lowBandPeriodSize = lowBandPeriodSize; + this.highBandValue = highBandValue; + this.highBandDeviationPercentage = highBandDeviationPercentage; + this.highBandPeriodSize = highBandPeriodSize; + init(); + } + + private void init() { + lowBandValueLowerLimit = lowBandValue - lowBandDeviationPercentage * lowBandValue; + lowBandValueHigherLimit = lowBandValue + lowBandDeviationPercentage * lowBandValue; + highBandLowerLimit = highBandValue - highBandDeviationPercentage * highBandValue; + highBandUpperLimit = highBandValue + highBandDeviationPercentage * highBandValue; + } + + @Override + public double nextValue() { + + double value = 0.0; + + if (l < lowBandPeriodSize) { + value = lowBandValueLowerLimit + (lowBandValueHigherLimit - lowBandValueLowerLimit) * random.nextDouble(); + l++; + } else if (h < highBandPeriodSize) { + value = highBandLowerLimit + (highBandUpperLimit - highBandLowerLimit) * random.nextDouble(); + h++; + } + + if (l == lowBandPeriodSize && h == highBandPeriodSize) { + l = 0; + h = 0; + } + + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java new file mode 100644 index 0000000..1e37ff3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorFactory.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Arrays; +import java.util.Map; +import java.util.Random; + +public class MetricSeriesGeneratorFactory { + + /** + * Return a normally distributed data series with some deviation % and outliers. + * + * @param n size of the data series + * @param value The value around which the uniform data series is centered on. + * @param deviationPercentage The allowed deviation % on either side of the uniform value. For example, if value = 10, and deviation % is 0.1, the series values lie between 0.9 to 1.1. + * @param outlierProbability The probability of finding an outlier in the series. + * @param outlierDeviationLowerPercentage min percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13. + * @param outlierDeviationHigherPercentage max percentage outlier should be away from the uniform value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16. + * @param outliersAboveValue Outlier should be greater or smaller than the value. + * @return uniform series + */ + public static double[] createUniformSeries(int n, + double value, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + + UniformMetricSeries metricSeries = new UniformMetricSeries(value, + deviationPercentage, + outlierProbability, + outlierDeviationLowerPercentage, + outlierDeviationHigherPercentage, + outliersAboveValue); + + return metricSeries.getSeries(n); + } + + + /** + * /** + * Returns a normally distributed series. + * + * @param n size of the data series + * @param mean mean of the distribution + * @param sd sd of the distribution + * @param outlierProbability sd of the distribution + * @param outlierDeviationSDTimesLower Lower Limit of the outlier with respect to times sdev from the mean. + * @param outlierDeviationSDTimesHigher Higher Limit of the outlier with respect to times sdev from the mean. + * @param outlierOnRightEnd Outlier should be on the right end or the left end. + * @return normal series + */ + public static double[] createNormalSeries(int n, + double mean, + double sd, + double outlierProbability, + double outlierDeviationSDTimesLower, + double outlierDeviationSDTimesHigher, + boolean outlierOnRightEnd) { + + + NormalMetricSeries metricSeries = new NormalMetricSeries(mean, + sd, + outlierProbability, + outlierDeviationSDTimesLower, + outlierDeviationSDTimesHigher, + outlierOnRightEnd); + + return metricSeries.getSeries(n); + } + + + /** + * Returns a monotonically increasing / decreasing series + * + * @param n size of the data series + * @param startValue Start value of the monotonic sequence + * @param slope direction of monotonicity m > 0 for increasing and m < 0 for decreasing. + * @param deviationPercentage The allowed deviation % on either side of the current 'y' value. For example, if current value = 10 according to slope, and deviation % is 0.1, the series values lie between 0.9 to 1.1. + * @param outlierProbability The probability of finding an outlier in the series. + * @param outlierDeviationLowerPercentage min percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 30%, the outlier is 7 and 13. + * @param outlierDeviationHigherPercentage max percentage outlier should be away from the current 'y' value in % terms. if value = 10 and outlierDeviationPercentage = 60%, the outlier is 4 and 16. + * @param outliersAboveValue Outlier should be greater or smaller than the 'y' value. + * @return + */ + public static double[] createMonotonicSeries(int n, + double startValue, + double slope, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + + MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(startValue, + slope, + deviationPercentage, + outlierProbability, + outlierDeviationLowerPercentage, + outlierDeviationHigherPercentage, + outliersAboveValue); + + return metricSeries.getSeries(n); + } + + + /** + * Returns a dual band series (lower and higher) + * + * @param n size of the data series + * @param lowBandValue lower band value + * @param lowBandDeviationPercentage lower band deviation + * @param lowBandPeriodSize lower band + * @param highBandValue high band centre value + * @param highBandDeviationPercentage high band deviation. + * @param highBandPeriodSize high band size + * @return + */ + public static double[] getDualBandSeries(int n, + double lowBandValue, + double lowBandDeviationPercentage, + int lowBandPeriodSize, + double highBandValue, + double highBandDeviationPercentage, + int highBandPeriodSize) { + + DualBandMetricSeries metricSeries = new DualBandMetricSeries(lowBandValue, + lowBandDeviationPercentage, + lowBandPeriodSize, + highBandValue, + highBandDeviationPercentage, + highBandPeriodSize); + + return metricSeries.getSeries(n); + } + + /** + * Returns a step function series. + * + * @param n size of the data series + * @param startValue start steady value + * @param steadyValueDeviationPercentage required devation in the steady state value + * @param steadyPeriodSlope direction of monotonicity m > 0 for increasing and m < 0 for decreasing, m = 0 no increase or decrease. + * @param steadyPeriodMinSize min size for step period + * @param steadyPeriodMaxSize max size for step period. + * @param stepChangePercentage Increase / decrease in steady state to denote a step in terms of deviation percentage from the last value. + * @param upwardStep upward or downward step. + * @return + */ + public static double[] getStepFunctionSeries(int n, + double startValue, + double steadyValueDeviationPercentage, + double steadyPeriodSlope, + int steadyPeriodMinSize, + int steadyPeriodMaxSize, + double stepChangePercentage, + boolean upwardStep) { + + StepFunctionMetricSeries metricSeries = new StepFunctionMetricSeries(startValue, + steadyValueDeviationPercentage, + steadyPeriodSlope, + steadyPeriodMinSize, + steadyPeriodMaxSize, + stepChangePercentage, + upwardStep); + + return metricSeries.getSeries(n); + } + + /** + * Series with small period of turbulence and then back to steady. + * + * @param n size of the data series + * @param steadyStateValue steady state center value + * @param steadyStateDeviationPercentage steady state deviation in percentage + * @param turbulentPeriodDeviationLowerPercentage turbulent state lower limit in terms of percentage from centre value. + * @param turbulentPeriodDeviationHigherPercentage turbulent state higher limit in terms of percentage from centre value. + * @param turbulentPeriodLength turbulent period length (number of points) + * @param turbulentStatePosition Where the turbulent state should be 0 - at the beginning, 1 - in the middle (25% - 50% of the series), 2 - at the end of the series. + * @return + */ + public static double[] getSteadySeriesWithTurbulentPeriod(int n, + double steadyStateValue, + double steadyStateDeviationPercentage, + double turbulentPeriodDeviationLowerPercentage, + double turbulentPeriodDeviationHigherPercentage, + int turbulentPeriodLength, + int turbulentStatePosition + ) { + + + SteadyWithTurbulenceMetricSeries metricSeries = new SteadyWithTurbulenceMetricSeries(n, + steadyStateValue, + steadyStateDeviationPercentage, + turbulentPeriodDeviationLowerPercentage, + turbulentPeriodDeviationHigherPercentage, + turbulentPeriodLength, + turbulentStatePosition); + + return metricSeries.getSeries(n); + } + + + public static double[] generateSeries(String type, int n, Map<String, String> configs) { + + double[] series; + switch (type) { + + case "normal": + series = createNormalSeries(n, + Double.parseDouble(configs.getOrDefault("mean", "0")), + Double.parseDouble(configs.getOrDefault("sd", "1")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")), + Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true"))); + break; + + case "uniform": + series = createUniformSeries(n, + Double.parseDouble(configs.getOrDefault("value", "10")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "monotonic": + series = createMonotonicSeries(n, + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("slope", "0")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "dualband": + series = getDualBandSeries(n, + Double.parseDouble(configs.getOrDefault("lowBandValue", "10")), + Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")), + Double.parseDouble(configs.getOrDefault("highBandValue", "10")), + Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0"))); + break; + + case "step": + series = getStepFunctionSeries(n, + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")), + Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true"))); + break; + + case "turbulence": + series = getSteadySeriesWithTurbulentPeriod(n, + Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")), + Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")), + Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0"))); + break; + + default: + series = createNormalSeries(n, + 0, + 1, + 0, + 0, + 0, + true); + } + return series; + } + + public static AbstractMetricSeries generateSeries(String type, Map<String, String> configs) { + + AbstractMetricSeries series; + switch (type) { + + case "normal": + series = new NormalMetricSeries(Double.parseDouble(configs.getOrDefault("mean", "0")), + Double.parseDouble(configs.getOrDefault("sd", "1")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesLower", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationSDTimesHigher", "0")), + Boolean.parseBoolean(configs.getOrDefault("outlierOnRightEnd", "true"))); + break; + + case "uniform": + series = new UniformMetricSeries( + Double.parseDouble(configs.getOrDefault("value", "10")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "monotonic": + series = new MonotonicMetricSeries( + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("slope", "0")), + Double.parseDouble(configs.getOrDefault("deviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierProbability", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("outlierDeviationHigherPercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("outliersAboveValue", "true"))); + break; + + case "dualband": + series = new DualBandMetricSeries( + Double.parseDouble(configs.getOrDefault("lowBandValue", "10")), + Double.parseDouble(configs.getOrDefault("lowBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("lowBandPeriodSize", "0")), + Double.parseDouble(configs.getOrDefault("highBandValue", "10")), + Double.parseDouble(configs.getOrDefault("highBandDeviationPercentage", "0")), + Integer.parseInt(configs.getOrDefault("highBandPeriodSize", "0"))); + break; + + case "step": + series = new StepFunctionMetricSeries( + Double.parseDouble(configs.getOrDefault("startValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyValueDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("steadyPeriodSlope", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMinSize", "0")), + Integer.parseInt(configs.getOrDefault("steadyPeriodMaxSize", "0")), + Double.parseDouble(configs.getOrDefault("stepChangePercentage", "0")), + Boolean.parseBoolean(configs.getOrDefault("upwardStep", "true"))); + break; + + case "turbulence": + series = new SteadyWithTurbulenceMetricSeries( + Integer.parseInt(configs.getOrDefault("approxSeriesLength", "100")), + Double.parseDouble(configs.getOrDefault("steadyStateValue", "10")), + Double.parseDouble(configs.getOrDefault("steadyStateDeviationPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationLowerPercentage", "0")), + Double.parseDouble(configs.getOrDefault("turbulentPeriodDeviationHigherPercentage", "10")), + Integer.parseInt(configs.getOrDefault("turbulentPeriodLength", "0")), + Integer.parseInt(configs.getOrDefault("turbulentStatePosition", "0"))); + break; + + default: + series = new NormalMetricSeries(0, + 1, + 0, + 0, + 0, + true); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java new file mode 100644 index 0000000..a883d08 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MonotonicMetricSeries.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class MonotonicMetricSeries implements AbstractMetricSeries { + + double startValue = 0.0; + double slope = 0.5; + double deviationPercentage = 0.0; + double outlierProbability = 0.0; + double outlierDeviationLowerPercentage = 0.0; + double outlierDeviationHigherPercentage = 0.0; + boolean outliersAboveValue = true; + + Random random = new Random(); + double nonOutlierProbability; + + // y = mx + c + double y; + double m; + double x; + double c; + + public MonotonicMetricSeries(double startValue, + double slope, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + this.startValue = startValue; + this.slope = slope; + this.deviationPercentage = deviationPercentage; + this.outlierProbability = outlierProbability; + this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; + this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; + this.outliersAboveValue = outliersAboveValue; + init(); + } + + private void init() { + y = startValue; + m = slope; + x = 1; + c = y - (m * x); + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + y = m * x + c; + if (probability <= nonOutlierProbability) { + double valueDeviationLowerLimit = y - deviationPercentage * y; + double valueDeviationHigherLimit = y + deviationPercentage * y; + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } else { + if (outliersAboveValue) { + double outlierLowerLimit = y + outlierDeviationLowerPercentage * y; + double outlierUpperLimit = y + outlierDeviationHigherPercentage * y; + value = outlierLowerLimit + (outlierUpperLimit - outlierLowerLimit) * random.nextDouble(); + } else { + double outlierLowerLimit = y - outlierDeviationLowerPercentage * y; + double outlierUpperLimit = y - outlierDeviationHigherPercentage * y; + value = outlierUpperLimit + (outlierLowerLimit - outlierUpperLimit) * random.nextDouble(); + } + } + x++; + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java new file mode 100644 index 0000000..cc83d2c --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/NormalMetricSeries.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class NormalMetricSeries implements AbstractMetricSeries { + + double mean = 0.0; + double sd = 1.0; + double outlierProbability = 0.0; + double outlierDeviationSDTimesLower = 0.0; + double outlierDeviationSDTimesHigher = 0.0; + boolean outlierOnRightEnd = true; + + Random random = new Random(); + double nonOutlierProbability; + + + public NormalMetricSeries(double mean, + double sd, + double outlierProbability, + double outlierDeviationSDTimesLower, + double outlierDeviationSDTimesHigher, + boolean outlierOnRightEnd) { + this.mean = mean; + this.sd = sd; + this.outlierProbability = outlierProbability; + this.outlierDeviationSDTimesLower = outlierDeviationSDTimesLower; + this.outlierDeviationSDTimesHigher = outlierDeviationSDTimesHigher; + this.outlierOnRightEnd = outlierOnRightEnd; + init(); + } + + private void init() { + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + if (probability <= nonOutlierProbability) { + value = random.nextGaussian() * sd + mean; + } else { + if (outlierOnRightEnd) { + value = mean + (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd; + } else { + value = mean - (outlierDeviationSDTimesLower + (outlierDeviationSDTimesHigher - outlierDeviationSDTimesLower) * random.nextDouble()) * sd; + } + } + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java new file mode 100644 index 0000000..c4ed3ba --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/SteadyWithTurbulenceMetricSeries.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class SteadyWithTurbulenceMetricSeries implements AbstractMetricSeries { + + double steadyStateValue = 0.0; + double steadyStateDeviationPercentage = 0.0; + double turbulentPeriodDeviationLowerPercentage = 0.3; + double turbulentPeriodDeviationHigherPercentage = 0.5; + int turbulentPeriodLength = 5; + int turbulentStatePosition = 1; + int approximateSeriesLength = 10; + + Random random = new Random(); + double valueDeviationLowerLimit; + double valueDeviationHigherLimit; + double tPeriodLowerLimit; + double tPeriodUpperLimit; + int tPeriodStartIndex = 0; + int index = 0; + + public SteadyWithTurbulenceMetricSeries(int approximateSeriesLength, + double steadyStateValue, + double steadyStateDeviationPercentage, + double turbulentPeriodDeviationLowerPercentage, + double turbulentPeriodDeviationHigherPercentage, + int turbulentPeriodLength, + int turbulentStatePosition) { + this.approximateSeriesLength = approximateSeriesLength; + this.steadyStateValue = steadyStateValue; + this.steadyStateDeviationPercentage = steadyStateDeviationPercentage; + this.turbulentPeriodDeviationLowerPercentage = turbulentPeriodDeviationLowerPercentage; + this.turbulentPeriodDeviationHigherPercentage = turbulentPeriodDeviationHigherPercentage; + this.turbulentPeriodLength = turbulentPeriodLength; + this.turbulentStatePosition = turbulentStatePosition; + init(); + } + + private void init() { + + if (turbulentStatePosition == 1) { + tPeriodStartIndex = (int) (0.25 * approximateSeriesLength + (0.25 * approximateSeriesLength * random.nextDouble())); + } else if (turbulentStatePosition == 2) { + tPeriodStartIndex = approximateSeriesLength - turbulentPeriodLength; + } + + valueDeviationLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue; + valueDeviationHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue; + + tPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue; + tPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue; + } + + @Override + public double nextValue() { + + double value; + + if (index >= tPeriodStartIndex && index <= (tPeriodStartIndex + turbulentPeriodLength)) { + value = tPeriodLowerLimit + (tPeriodUpperLimit - tPeriodLowerLimit) * random.nextDouble(); + } else { + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } + index++; + return value; + } + + @Override + public double[] getSeries(int n) { + + double[] series = new double[n]; + int turbulentPeriodStartIndex = 0; + + if (turbulentStatePosition == 1) { + turbulentPeriodStartIndex = (int) (0.25 * n + (0.25 * n * random.nextDouble())); + } else if (turbulentStatePosition == 2) { + turbulentPeriodStartIndex = n - turbulentPeriodLength; + } + + double valueDevLowerLimit = steadyStateValue - steadyStateDeviationPercentage * steadyStateValue; + double valueDevHigherLimit = steadyStateValue + steadyStateDeviationPercentage * steadyStateValue; + + double turbulentPeriodLowerLimit = steadyStateValue + turbulentPeriodDeviationLowerPercentage * steadyStateValue; + double turbulentPeriodUpperLimit = steadyStateValue + turbulentPeriodDeviationHigherPercentage * steadyStateValue; + + for (int i = 0; i < n; i++) { + if (i >= turbulentPeriodStartIndex && i < (turbulentPeriodStartIndex + turbulentPeriodLength)) { + series[i] = turbulentPeriodLowerLimit + (turbulentPeriodUpperLimit - turbulentPeriodLowerLimit) * random.nextDouble(); + } else { + series[i] = valueDevLowerLimit + (valueDevHigherLimit - valueDevLowerLimit) * random.nextDouble(); + } + } + + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java new file mode 100644 index 0000000..d5beb48 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class StepFunctionMetricSeries implements AbstractMetricSeries { + + double startValue = 0.0; + double steadyValueDeviationPercentage = 0.0; + double steadyPeriodSlope = 0.5; + int steadyPeriodMinSize = 10; + int steadyPeriodMaxSize = 20; + double stepChangePercentage = 0.0; + boolean upwardStep = true; + + Random random = new Random(); + + // y = mx + c + double y; + double m; + double x; + double c; + int currentStepSize; + int currentIndex; + + public StepFunctionMetricSeries(double startValue, + double steadyValueDeviationPercentage, + double steadyPeriodSlope, + int steadyPeriodMinSize, + int steadyPeriodMaxSize, + double stepChangePercentage, + boolean upwardStep) { + this.startValue = startValue; + this.steadyValueDeviationPercentage = steadyValueDeviationPercentage; + this.steadyPeriodSlope = steadyPeriodSlope; + this.steadyPeriodMinSize = steadyPeriodMinSize; + this.steadyPeriodMaxSize = steadyPeriodMaxSize; + this.stepChangePercentage = stepChangePercentage; + this.upwardStep = upwardStep; + init(); + } + + private void init() { + y = startValue; + m = steadyPeriodSlope; + x = 1; + c = y - (m * x); + + currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); + currentIndex = 0; + } + + @Override + public double nextValue() { + + double value = 0.0; + + if (currentIndex < currentStepSize) { + y = m * x + c; + double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y; + double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y; + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + x++; + currentIndex++; + } + + if (currentIndex == currentStepSize) { + currentIndex = 0; + currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); + if (upwardStep) { + y = y + stepChangePercentage * y; + } else { + y = y - stepChangePercentage * y; + } + x = 1; + c = y - (m * x); + } + + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java new file mode 100644 index 0000000..a2b0eea --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.seriesgenerator; + +import java.util.Random; + +public class UniformMetricSeries implements AbstractMetricSeries { + + double value = 0.0; + double deviationPercentage = 0.0; + double outlierProbability = 0.0; + double outlierDeviationLowerPercentage = 0.0; + double outlierDeviationHigherPercentage = 0.0; + boolean outliersAboveValue= true; + + Random random = new Random(); + double valueDeviationLowerLimit; + double valueDeviationHigherLimit; + double outlierLeftLowerLimit; + double outlierLeftHigherLimit; + double outlierRightLowerLimit; + double outlierRightUpperLimit; + double nonOutlierProbability; + + + public UniformMetricSeries(double value, + double deviationPercentage, + double outlierProbability, + double outlierDeviationLowerPercentage, + double outlierDeviationHigherPercentage, + boolean outliersAboveValue) { + this.value = value; + this.deviationPercentage = deviationPercentage; + this.outlierProbability = outlierProbability; + this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; + this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; + this.outliersAboveValue = outliersAboveValue; + init(); + } + + private void init() { + valueDeviationLowerLimit = value - deviationPercentage * value; + valueDeviationHigherLimit = value + deviationPercentage * value; + + outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value; + outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value; + outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value; + outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value; + + nonOutlierProbability = 1.0 - outlierProbability; + } + + @Override + public double nextValue() { + + double value; + double probability = random.nextDouble(); + + if (probability <= nonOutlierProbability) { + value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); + } else { + if (!outliersAboveValue) { + value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble(); + } else { + value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble(); + } + } + return value; + } + + @Override + public double[] getSeries(int n) { + double[] series = new double[n]; + for (int i = 0; i < n; i++) { + series[i] = nextValue(); + } + return series; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R new file mode 100644 index 0000000..0b66095 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/ema.R @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# EMA <- w * EMA + (1 - w) * x +# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 ) +# Alarm = abs(x - EMA) > n * EMS + +ema_global <- function(train_data, test_data, w, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + ema <- 0 + ems <- 0 + + #Train Step + for (x in train_data) { + ema <- w*ema + (1-w)*x + ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) + } + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + if (abs(x - ema) > n*ems) { + anomaly <- c(as.numeric(test_data[i,1]), x) + # print (anomaly) + anomalies <- rbind(anomalies, anomaly) + } + ema <- w*ema + (1-w)*x + ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return (anomalies) +} + +ema_daily <- function(train_data, test_data, w, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + ema <- vector("numeric", 7) + ems <- vector("numeric", 7) + + #Train Step + for ( i in 1:length(train_data[,1])) { + x <- train_data[i,2] + time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") + index <- time$wday + ema[index] <- w*ema[index] + (1-w)*x + ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2) + } + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") + index <- time$wday + + if (abs(x - ema[index+1]) > n*ems[index+1]) { + anomaly <- c(as.numeric(test_data[i,1]), x) + # print (anomaly) + anomalies <- rbind(anomalies, anomaly) + } + ema[index+1] <- w*ema[index+1] + (1-w)*x + ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return(anomalies) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r new file mode 100644 index 0000000..bca3366 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/hsdev.r @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) { + + #res <- get_data(url) + #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + #names(data) <- c("TS", res$metrics[[1]]$metricname) + anomalies <- data.frame() + + granularity <- train_data[2,1] - train_data[1,1] + test_start <- test_data[1,1] + test_end <- test_data[length(test_data[1,]),1] + train_start <- test_start - num_historic_periods*period + # round to start of day + train_start <- train_start - (train_start %% interval) + + time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT") + test_data_day <- time$wday + + h_data <- c() + for ( i in length(train_data[,1]):1) { + ts <- train_data[i,1] + if ( ts < train_start) { + break + } + time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT") + if (time$wday == test_data_day) { + x <- train_data[i,2] + h_data <- c(h_data, x) + } + } + + if (length(h_data) < 2*length(test_data[,1])) { + cat ("\nNot enough training data") + return (anomalies) + } + + past_median <- median(h_data) + past_sd <- sd(h_data) + curr_median <- median(test_data[,2]) + + if (abs(curr_median - past_median) > n * past_sd) { + anomaly <- c(test_start, test_end, curr_median, past_median, past_sd) + anomalies <- rbind(anomalies, anomaly) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD") + } + + return (anomalies) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R new file mode 100644 index 0000000..8956400 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/iforest.R @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) { + + res <- get_data(url) + num_metrics <- length(res$metrics) + anomalies <- data.frame() + + metricname <- res$metrics[[1]]$metricname + data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + names(data) <- c("TS", res$metrics[[1]]$metricname) + + for (i in 2:num_metrics) { + metricname <- res$metrics[[i]]$metricname + df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics)) + names(df) <- c("TS", res$metrics[[i]]$metricname) + data <- merge(data, df) + } + + algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)] + iForest <- IsolationTrees(algo_data) + test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ] + + if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest) + for (i in 1:length(if_res$outF)) { + index <- test_start+i-1 + if (if_res$outF[i] > threshold_score) { + anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i]) + anomalies <- rbind(anomalies, anomaly) + } + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Anomaly Score", "Path length") + } + return (anomalies) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r new file mode 100644 index 0000000..f22bc15 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/kstest.r @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ams_ks <- function(train_data, test_data, p_value) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2] + + anomalies <- data.frame() + res <- ks.test(train_data[,2], test_data[,2]) + + if (res[2] < p_value) { + anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2]) + anomalies <- rbind(anomalies, anomaly) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS Start", "TS End", "D", "p-value") + } + return (anomalies) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R new file mode 100644 index 0000000..7650356 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/test.R @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +tukeys_anomalies <- data.frame() +ema_global_anomalies <- data.frame() +ema_daily_anomalies <- data.frame() +ks_anomalies <- data.frame() +hsdev_anomalies <- data.frame() + +init <- function() { + tukeys_anomalies <- data.frame() + ema_global_anomalies <- data.frame() + ema_daily_anomalies <- data.frame() + ks_anomalies <- data.frame() + hsdev_anomalies <- data.frame() +} + +test_methods <- function(data) { + + init() + #res <- get_data(url) + #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + #names(data) <- c("TS", res$metrics[[1]]$metricname) + + limit <- data[length(data[,1]),1] + step <- data[2,1] - data[1,1] + + train_start <- data[1,1] + train_end <- get_next_day_boundary(train_start, step, limit) + test_start <- train_end + step + test_end <- get_next_day_boundary(test_start, step, limit) + i <- 1 + day <- 24*60*60*1000 + + while (test_start < limit) { + + print (i) + i <- i + 1 + train_data <- data[which(data$TS >= train_start & data$TS <= train_end),] + test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3)) + #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3)) + #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3)) + #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05)) + hsdev_train_data <- data[which(data$TS < test_start),] + hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day)) + + train_start <- test_start + train_end <- get_next_day_boundary(train_start, step, limit) + test_start <- train_end + step + test_end <- get_next_day_boundary(test_start, step, limit) + } + return (hsdev_anomalies) +} + +get_next_day_boundary <- function(start, step, limit) { + + if (start > limit) { + return (-1) + } + + while (start <= limit) { + if (((start %% (24*60*60*1000)) - 28800000) == 0) { + return (start) + } + start <- start + step + } + return (start) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r new file mode 100644 index 0000000..0312226 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/R-scripts/tukeys.r @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +ams_tukeys <- function(train_data, test_data, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + quantiles <- quantile(train_data[,2]) + iqr <- quantiles[4] - quantiles[2] + niqr <- 0 + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + lb <- quantiles[2] - n*iqr + ub <- quantiles[4] + n*iqr + if ( (x < lb) || (x > ub) ) { + if (iqr != 0) { + if (x < lb) { + niqr <- (quantiles[2] - x) / iqr + } else { + niqr <- (x - quantiles[4]) / iqr + } + } + anomaly <- c(test_data[i,1], x, niqr) + anomalies <- rbind(anomalies, anomaly) + } + } + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value", "niqr") + } + return (anomalies) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties new file mode 100644 index 0000000..ab106c4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/resources/input-config.properties @@ -0,0 +1,42 @@ +# Copyright 2011 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +appIds=HOST + +collectorHost=localhost +collectorPort=6188 +collectorProtocol=http + +zkQuorum=localhost:2181 + +ambariServerHost=localhost +clusterName=c1 + +emaW=0.8 +emaN=3 +tukeysN=3 +pointInTimeTestInterval=300000 +pointInTimeTrainInterval=900000 + +ksTestInterval=600000 +ksTrainInterval=600000 +hsdevNhp=3 +hsdevInterval=1800000; + +skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime +hosts=avijayan-ad-1.openstacklocal \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala new file mode 100644 index 0000000..324058b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.spark + + +import java.io.{FileInputStream, IOException, InputStream} +import java.util +import java.util.Properties +import java.util.logging.LogManager + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.ambari.metrics.alertservice.prototype.core.MetricsCollectorInterface +import org.apache.spark.SparkConf +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly} +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique} +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics +import org.apache.log4j.Logger +import org.apache.spark.storage.StorageLevel + +object MetricAnomalyDetector { + + /* + Load current EMA model + Filter step - Check if anomaly + Collect / Write to AMS / Print. + */ + +// var brokers = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181" +// var groupId = "ambari-metrics-group" +// var topicName = "ambari-metrics-topic" +// var numThreads = 1 +// val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]() +// +// def readProperties(propertiesFile: String): Properties = try { +// val properties = new Properties +// var inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile) +// if (inputStream == null) inputStream = new FileInputStream(propertiesFile) +// properties.load(inputStream) +// properties +// } catch { +// case ioEx: IOException => +// null +// } +// +// def main(args: Array[String]): Unit = { +// +// @transient +// lazy val log = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger") +// +// if (args.length < 1) { +// System.err.println("Usage: MetricSparkConsumer <input-config-file>") +// System.exit(1) +// } +// +// //Read properties +// val properties = readProperties(propertiesFile = args(0)) +// +// //Load EMA parameters - w, n +// val emaW = properties.getProperty("emaW").toDouble +// val emaN = properties.getProperty("emaN").toDouble +// +// //collector info +// val collectorHost: String = properties.getProperty("collectorHost") +// val collectorPort: String = properties.getProperty("collectorPort") +// val collectorProtocol: String = properties.getProperty("collectorProtocol") +// val anomalyMetricPublisher = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort) +// +// //Instantiate Kafka stream reader +// val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector") +// val streamingContext = new StreamingContext(sparkConf, Duration(10000)) +// +// val topicsSet = topicName.toSet +// val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) +//// val stream = KafkaUtils.createDirectStream() +// +// val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2) +// kafkaStream.print() +// +// var timelineMetricsStream = kafkaStream.map( message => { +// val mapper = new ObjectMapper +// val metrics = mapper.readValue(message._2, classOf[TimelineMetrics]) +// metrics +// }) +// timelineMetricsStream.print() +// +// var appMetricStream = timelineMetricsStream.map( timelineMetrics => { +// (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics) +// }) +// appMetricStream.print() +// +// var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => { +// appIds.contains(appMetricTuple._1) +// } ) +// filteredAppMetricStream.print() +// +// filteredAppMetricStream.foreachRDD( rdd => { +// rdd.foreach( appMetricTuple => { +// val timelineMetrics = appMetricTuple._2 +// logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName) +// log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName) +// for (timelineMetric <- timelineMetrics.getMetrics) { +// var anomalies = emaModel.test(timelineMetric) +// anomalyMetricPublisher.publish(anomalies) +// } +// }) +// }) +// +// streamingContext.start() +// streamingContext.awaitTermination() +// } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala new file mode 100644 index 0000000..ccded6b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.spark + +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +object SparkPhoenixReader { + + def main(args: Array[String]) { + + if (args.length < 6) { + System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") + System.exit(1) + } + + var metricName = args(0) + var appId = args(1) + var hostname = args(2) + var weight = args(3).toDouble + var timessdev = args(4).toInt + var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure + var modelDir = args(6) + + val conf = new SparkConf() + conf.set("spark.app.name", "AMSAnomalyModelBuilder") + //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") + + var sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + + val currentTime = System.currentTimeMillis() + val oneDayBack = currentTime - 24*60*60*1000 + + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) + df.registerTempTable("METRIC_RECORD") + val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + + "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) + + var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] + result.collect().foreach( + t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) + ) + + //val seriesName = result.head().getString(0) + //val hostname = result.head().getString(1) + //val appId = result.head().getString(2) + + val timelineMetric = new TimelineMetric() + timelineMetric.setMetricName(metricName) + timelineMetric.setAppId(appId) + timelineMetric.setHostName(hostname) + timelineMetric.setMetricValues(metricValues) + + var emaModel = new EmaTechnique(weight, timessdev) + emaModel.test(timelineMetric) + emaModel.save(sc, modelDir) + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java new file mode 100644 index 0000000..a0b06e6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype; + +import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; +import java.util.TreeMap; + +import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS; + +public class TestEmaTechnique { + + private static double[] ts; + private static String fullFilePath; + + @BeforeClass + public static void init() throws URISyntaxException { + + Assume.assumeTrue(System.getenv("R_HOME") != null); + ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + } + + @Test + public void testEmaInitialization() { + + EmaTechnique ema = new EmaTechnique(0.5, 3); + Assert.assertTrue(ema.getTrackedEmas().isEmpty()); + Assert.assertTrue(ema.getStartingWeight() == 0.5); + Assert.assertTrue(ema.getStartTimesSdev() == 2); + } + + @Test + public void testEma() { + EmaTechnique ema = new EmaTechnique(0.5, 3); + + long now = System.currentTimeMillis(); + + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("M1"); + metric1.setHostName("H1"); + metric1.setStartTime(now - 1000); + metric1.setAppId("A1"); + metric1.setInstanceId(null); + metric1.setType("Integer"); + + //Train + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + for (int i = 0; i < 50; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 100, metric); + } + metric1.setMetricValues(metricValues); + List<MetricAnomaly> anomalyList = ema.test(metric1); +// Assert.assertTrue(anomalyList.isEmpty()); + + metricValues = new TreeMap<Long, Double>(); + for (int i = 0; i < 50; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 100, metric); + } + metric1.setMetricValues(metricValues); + anomalyList = ema.test(metric1); + Assert.assertTrue(!anomalyList.isEmpty()); + int l1 = anomalyList.size(); + + Assert.assertTrue(ema.updateModel(metric1, false, 20)); + anomalyList = ema.test(metric1); + int l2 = anomalyList.size(); + Assert.assertTrue(l2 < l1); + + Assert.assertTrue(ema.updateModel(metric1, true, 50)); + anomalyList = ema.test(metric1); + int l3 = anomalyList.size(); + Assert.assertTrue(l3 > l2 && l3 > l1); + + } +}
