http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java new file mode 100644 index 0000000..251603b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.methods; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class MetricAnomaly implements Serializable{ + + private String methodType; + private double anomalyScore; + private String metricKey; + private long timestamp; + private double metricValue; + + + public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) { + this.metricKey = metricKey; + this.timestamp = timestamp; + this.metricValue = metricValue; + this.methodType = methodType; + this.anomalyScore = anomalyScore; + + } + + public String getMethodType() { + return methodType; + } + + public void setMethodType(String methodType) { + this.methodType = methodType; + } + + public double getAnomalyScore() { + return anomalyScore; + } + + public void setAnomalyScore(double anomalyScore) { + this.anomalyScore = anomalyScore; + } + + public void setMetricKey(String metricKey) { + this.metricKey = metricKey; + } + + public String getMetricKey() { + return metricKey; + } + + public void setMetricName(String metricName) { + this.metricKey = metricName; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public double getMetricValue() { + return metricValue; + } + + public void setMetricValue(double metricValue) { + this.metricValue = metricValue; + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java new file mode 100644 index 0000000..593028e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.methods.ema; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; + +import static org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold; + +@XmlRootElement +public class EmaModel implements Serializable { + + private String metricName; + private String hostname; + private String appId; + private double ema; + private double ems; + private double weight; + private double timessdev; + + private int ctr = 0; + + private static final Log LOG = LogFactory.getLog(EmaModel.class); + + public EmaModel(String name, String hostname, String appId, double weight, double timessdev) { + this.metricName = name; + this.hostname = hostname; + this.appId = appId; + this.weight = weight; + this.timessdev = timessdev; + this.ema = 0.0; + this.ems = 0.0; + } + + public String getMetricName() { + return metricName; + } + + public String getHostname() { + return hostname; + } + + public String getAppId() { + return appId; + } + + public double testAndUpdate(double metricValue) { + + double anomalyScore = 0.0; + LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev); + update(metricValue); + if (ctr > suppressAnomaliesTheshold) { + anomalyScore = test(metricValue); + if (anomalyScore > 0.0) { + LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + + ", timessdev = " + timessdev + ", metricValue = " + metricValue); + } else { + LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + + ", timessdev = " + timessdev + ", metricValue = " + metricValue); + } + } else { + ctr++; + if (ctr > suppressAnomaliesTheshold) { + LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data."); + } + } + return anomalyScore; + } + + public void update(double metricValue) { + ema = weight * ema + (1 - weight) * metricValue; + ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); + LOG.debug("In update : ema = " + ema + ", ems = " + ems); + } + + public double test(double metricValue) { + LOG.debug("In test : ema = " + ema + ", ems = " + ems); + double diff = Math.abs(ema - metricValue) - (timessdev * ems); + LOG.debug("diff = " + diff); + if (diff > 0) { + return Math.abs((metricValue - ema) / ems); //Z score + } else { + return 0.0; + } + } + + public void updateModel(boolean increaseSensitivity, double percent) { + LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); + double delta = percent / 100; + if (increaseSensitivity) { + delta = delta * -1; + } + this.timessdev = timessdev + delta * timessdev; + //this.weight = Math.min(1.0, weight + delta * weight); + LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight); + } + + public double getWeight() { + return weight; + } + + public void setWeight(double weight) { + this.weight = weight; + } + + public double getTimessdev() { + return timessdev; + } + + public void setTimessdev(double timessdev) { + this.timessdev = timessdev; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java new file mode 100644 index 0000000..7623f27 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.methods.ema; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Loader; + +public class EmaModelLoader implements Loader<EmaTechnique> { + private static final Log LOG = LogFactory.getLog(EmaModelLoader.class); + + @Override + public EmaTechnique load(SparkContext sc, String path) { + return new EmaTechnique(0.5,3); +// Gson gson = new Gson(); +// try { +// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); +// return gson.fromJson(fileString, EmaTechnique.class); +// } catch (IOException e) { +// LOG.error(e); +// } +// return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java new file mode 100644 index 0000000..7ec17d8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.methods.ema; + +import com.google.gson.Gson; +import org.apache.ambari.metrics.adservice.prototype.methods.AnomalyDetectionTechnique; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.spark.SparkContext; +import org.apache.spark.mllib.util.Saveable; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.io.Writer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@XmlRootElement +public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable { + + @XmlElement(name = "trackedEmas") + private Map<String, EmaModel> trackedEmas; + private static final Log LOG = LogFactory.getLog(EmaTechnique.class); + + private double startingWeight = 0.5; + private double startTimesSdev = 3.0; + private String methodType = "ema"; + public static int suppressAnomaliesTheshold = 100; + + public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) { + trackedEmas = new HashMap<>(); + this.startingWeight = startingWeight; + this.startTimesSdev = startTimesSdev; + EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold; + LOG.info("New EmaTechnique......"); + } + + public EmaTechnique(double startingWeight, double startTimesSdev) { + trackedEmas = new HashMap<>(); + this.startingWeight = startingWeight; + this.startTimesSdev = startTimesSdev; + LOG.info("New EmaTechnique......"); + } + + public List<MetricAnomaly> test(TimelineMetric metric) { + String metricName = metric.getMetricName(); + String appId = metric.getAppId(); + String hostname = metric.getHostName(); + String key = metricName + ":" + appId + ":" + hostname; + + EmaModel emaModel = trackedEmas.get(key); + if (emaModel == null) { + LOG.debug("EmaModel not present for " + key); + LOG.debug("Number of tracked Emas : " + trackedEmas.size()); + emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev); + trackedEmas.put(key, emaModel); + } else { + LOG.debug("EmaModel already present for " + key); + } + + List<MetricAnomaly> anomalies = new ArrayList<>(); + + for (Long timestamp : metric.getMetricValues().keySet()) { + double metricValue = metric.getMetricValues().get(timestamp); + double anomalyScore = emaModel.testAndUpdate(metricValue); + if (anomalyScore > 0.0) { + LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore); + MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore); + anomalies.add(metricAnomaly); + } else { + LOG.debug("Discarding non-anomaly for : " + key); + } + } + return anomalies; + } + + public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) { + String metricName = timelineMetric.getMetricName(); + String appId = timelineMetric.getAppId(); + String hostname = timelineMetric.getHostName(); + String key = metricName + "_" + appId + "_" + hostname; + + + EmaModel emaModel = trackedEmas.get(key); + + if (emaModel == null) { + LOG.warn("EMA Model for " + key + " not found"); + return false; + } + emaModel.updateModel(increaseSensitivity, percent); + + return true; + } + + @Override + public void save(SparkContext sc, String path) { + Gson gson = new Gson(); + try { + String json = gson.toJson(this); + try (Writer writer = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(path), "utf-8"))) { + writer.write(json); + } + } catch (IOException e) { + LOG.error(e); + } + } + + @Override + public String formatVersion() { + return "1.0"; + } + + public Map<String, EmaModel> getTrackedEmas() { + return trackedEmas; + } + + public double getStartingWeight() { + return startingWeight; + } + + public double getStartTimesSdev() { + return startTimesSdev; + } + +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java new file mode 100644 index 0000000..6facc99 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.methods.hsdev; + +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median; +import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev; + +import java.io.Serializable; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class HsdevTechnique implements Serializable { + + private Map<String, Double> hsdevMap; + private String methodType = "hsdev"; + private static final Log LOG = LogFactory.getLog(HsdevTechnique.class); + + public HsdevTechnique() { + hsdevMap = new HashMap<>(); + } + + public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) { + int testLength = testData.values.length; + int trainLength = trainData.values.length; + + if (trainLength < testLength) { + LOG.info("Not enough train data."); + return null; + } + + if (!hsdevMap.containsKey(key)) { + hsdevMap.put(key, 3.0); + } + + double n = hsdevMap.get(key); + + double historicSd = sdev(trainData.values, false); + double historicMedian = median(trainData.values); + double currentMedian = median(testData.values); + + + if (historicSd > 0) { + double diff = Math.abs(currentMedian - historicMedian); + LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); + LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); + + if (diff > n * historicSd) { + double zScore = diff / historicSd; + LOG.info("Z Score of current series : " + zScore); + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + zScore); + } + } + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java new file mode 100644 index 0000000..4727c6f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.prototype.methods.kstest; + +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class KSTechnique implements Serializable { + + private String methodType = "ks"; + private Map<String, Double> pValueMap; + private static final Log LOG = LogFactory.getLog(KSTechnique.class); + + public KSTechnique() { + pValueMap = new HashMap(); + } + + public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) { + + int testLength = testData.values.length; + int trainLength = trainData.values.length; + + if (trainLength < testLength) { + LOG.info("Not enough train data."); + return null; + } + + if (!pValueMap.containsKey(key)) { + pValueMap.put(key, 0.05); + } + double pValue = pValueMap.get(key); + + ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue))); + if (result == null) { + LOG.error("Resultset is null when invoking KS R function..."); + return null; + } + + if (result.resultset.size() > 0) { + + LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length); + LOG.info("p_value = " + result.resultset.get(3)[0]); + double dValue = result.resultset.get(2)[0]; + + return new MetricAnomaly(key, + (long) testData.ts[testLength - 1], + testData.values[testLength - 1], + methodType, + dValue); + } + + return null; + } + + public void updateModel(String metricKey, boolean increaseSensitivity, double percent) { + + LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); + + if (!pValueMap.containsKey(metricKey)) { + LOG.error("Unknown metric key : " + metricKey); + LOG.info("pValueMap :" + pValueMap.toString()); + return; + } + + double delta = percent / 100; + if (!increaseSensitivity) { + delta = delta * -1; + } + + double pValue = pValueMap.get(metricKey); + double newPValue = Math.min(1.0, pValue + delta * pValue); + pValueMap.put(metricKey, newPValue); + LOG.info("New pValue = " + newPValue); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java new file mode 100644 index 0000000..9a002a1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.prototype.testing.utilities; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.List; +import java.util.Map; + +@XmlRootElement +public class MetricAnomalyDetectorTestInput { + + public MetricAnomalyDetectorTestInput() { + } + + //Train data + private String trainDataName; + private String trainDataType; + private Map<String, String> trainDataConfigs; + private int trainDataSize; + + //Test data + private String testDataName; + private String testDataType; + private Map<String, String> testDataConfigs; + private int testDataSize; + + //Algorithm data + private List<String> methods; + private Map<String, String> methodConfigs; + + public String getTrainDataName() { + return trainDataName; + } + + public void setTrainDataName(String trainDataName) { + this.trainDataName = trainDataName; + } + + public String getTrainDataType() { + return trainDataType; + } + + public void setTrainDataType(String trainDataType) { + this.trainDataType = trainDataType; + } + + public Map<String, String> getTrainDataConfigs() { + return trainDataConfigs; + } + + public void setTrainDataConfigs(Map<String, String> trainDataConfigs) { + this.trainDataConfigs = trainDataConfigs; + } + + public String getTestDataName() { + return testDataName; + } + + public void setTestDataName(String testDataName) { + this.testDataName = testDataName; + } + + public String getTestDataType() { + return testDataType; + } + + public void setTestDataType(String testDataType) { + this.testDataType = testDataType; + } + + public Map<String, String> getTestDataConfigs() { + return testDataConfigs; + } + + public void setTestDataConfigs(Map<String, String> testDataConfigs) { + this.testDataConfigs = testDataConfigs; + } + + public Map<String, String> getMethodConfigs() { + return methodConfigs; + } + + public void setMethodConfigs(Map<String, String> methodConfigs) { + this.methodConfigs = methodConfigs; + } + + public int getTrainDataSize() { + return trainDataSize; + } + + public void setTrainDataSize(int trainDataSize) { + this.trainDataSize = trainDataSize; + } + + public int getTestDataSize() { + return testDataSize; + } + + public void setTestDataSize(int testDataSize) { + this.testDataSize = testDataSize; + } + + public List<String> getMethods() { + return methods; + } + + public void setMethods(List<String> methods) { + this.methods = methods; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java new file mode 100644 index 0000000..d079e66 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.prototype.testing.utilities; + +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface; +import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +/** + * Class which was originally used to send test series from AMS to Spark through Kafka. + */ +public class MetricAnomalyTester { + +// public static String appId = MetricsCollectorInterface.serviceName; +// static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class); +// static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>(); +// +// public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException { +// +// long currentTime = System.currentTimeMillis(); +// TimelineMetrics timelineMetrics = new TimelineMetrics(); +// String hostname = InetAddress.getLocalHost().getHostName(); +// +// //Train data +// TimelineMetric metric1 = new TimelineMetric(); +// if (StringUtils.isNotEmpty(input.getTrainDataName())) { +// metric1 = timelineMetricMap.get(input.getTrainDataName()); +// if (metric1 == null) { +// metric1 = new TimelineMetric(); +// double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs()); +// metric1.setMetricName(input.getTrainDataName()); +// metric1.setAppId(appId); +// metric1.setHostName(hostname); +// metric1.setStartTime(currentTime); +// metric1.setInstanceId(null); +// metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries)); +// timelineMetricMap.put(input.getTrainDataName(), metric1); +// } +// timelineMetrics.getMetrics().add(metric1); +// } else { +// LOG.error("No train data name specified"); +// } +// +// //Test data +// TimelineMetric metric2 = new TimelineMetric(); +// if (StringUtils.isNotEmpty(input.getTestDataName())) { +// metric2 = timelineMetricMap.get(input.getTestDataName()); +// if (metric2 == null) { +// metric2 = new TimelineMetric(); +// double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs()); +// metric2.setMetricName(input.getTestDataName()); +// metric2.setAppId(appId); +// metric2.setHostName(hostname); +// metric2.setStartTime(currentTime); +// metric2.setInstanceId(null); +// metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries)); +// timelineMetricMap.put(input.getTestDataName(), metric2); +// } +// timelineMetrics.getMetrics().add(metric2); +// } else { +// LOG.warn("No test data name specified"); +// } +// +// //Invoke method +// if (CollectionUtils.isNotEmpty(input.getMethods())) { +// RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts"); +// for (String methodType : input.getMethods()) { +// ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs()); +// TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname); +// if (timelineMetric != null) { +// timelineMetrics.getMetrics().add(timelineMetric); +// } +// } +// } else { +// LOG.warn("No anomaly method requested"); +// } +// +// return timelineMetrics; +// } +// +// +// private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) { +// +// if (result == null) { +// return null; +// } +// +// TimelineMetric timelineMetric = new TimelineMetric(); +// if (methodType.equals("tukeys") || methodType.equals("ema")) { +// timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime); +// timelineMetric.setHostName(hostname); +// timelineMetric.setAppId(appId); +// timelineMetric.setInstanceId(null); +// timelineMetric.setStartTime(currentTime); +// +// TreeMap<Long, Double> metricValues = new TreeMap<>(); +// if (result.resultset.size() > 0) { +// double[] ts = result.resultset.get(0); +// double[] metrics = result.resultset.get(1); +// for (int i = 0; i < ts.length; i++) { +// if (i == 0) { +// timelineMetric.setStartTime((long) ts[i]); +// } +// metricValues.put((long) ts[i], metrics[i]); +// } +// } +// timelineMetric.setMetricValues(metricValues); +// return timelineMetric; +// } +// return null; +// } +// +// +// private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) { +// +// long startTime = currentTime - (values.length - 1) * 60 * 1000; +// TreeMap<Long, Double> metricValues = new TreeMap<>(); +// +// for (int i = 0; i < values.length; i++) { +// metricValues.put(startTime, values[i]); +// startTime += (60 * 1000); +// } +// return metricValues; +// } +// +// private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) { +// +// TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues(); +// double[] timestamps = new double[metricValues.size()]; +// double[] values = new double[metricValues.size()]; +// int i = 0; +// +// for (Long timestamp : metricValues.keySet()) { +// timestamps[i] = timestamp; +// values[i++] = metricValues.get(timestamp); +// } +// return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values); +// } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java new file mode 100644 index 0000000..3b2605b --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.testing.utilities; + +/** + * Class which was originally used to send test series from AMS to Spark through Kafka. + */ + +public class TestMetricSeriesGenerator { + //implements Runnable { + +// private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>(); +// private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class); +// private TimelineMetricStore metricStore; +// private String hostname; +// +// public TestMetricSeriesGenerator(TimelineMetricStore metricStore) { +// this.metricStore = metricStore; +// try { +// this.hostname = InetAddress.getLocalHost().getHostName(); +// } catch (UnknownHostException e) { +// e.printStackTrace(); +// } +// } +// +// public void addSeries(TestSeriesInputRequest inputRequest) { +// if (!configuredSeries.containsKey(inputRequest)) { +// AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs()); +// configuredSeries.put(inputRequest, metricSeries); +// LOG.info("Added series " + inputRequest.getSeriesName()); +// } +// } +// +// public void removeSeries(String seriesName) { +// boolean isPresent = false; +// TestSeriesInputRequest tbd = null; +// for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) { +// if (inputRequest.getSeriesName().equals(seriesName)) { +// isPresent = true; +// tbd = inputRequest; +// } +// } +// if (isPresent) { +// LOG.info("Removing series " + seriesName); +// configuredSeries.remove(tbd); +// } else { +// LOG.info("Series not found : " + seriesName); +// } +// } +// +// @Override +// public void run() { +// long currentTime = System.currentTimeMillis(); +// TimelineMetrics timelineMetrics = new TimelineMetrics(); +// +// for (TestSeriesInputRequest input : configuredSeries.keySet()) { +// AbstractMetricSeries metricSeries = configuredSeries.get(input); +// TimelineMetric timelineMetric = new TimelineMetric(); +// timelineMetric.setMetricName(input.getSeriesName()); +// timelineMetric.setAppId("anomaly-engine-test-metric"); +// timelineMetric.setInstanceId(null); +// timelineMetric.setStartTime(currentTime); +// timelineMetric.setHostName(hostname); +// TreeMap<Long, Double> metricValues = new TreeMap(); +// metricValues.put(currentTime, metricSeries.nextValue()); +// timelineMetric.setMetricValues(metricValues); +// timelineMetrics.addOrMergeTimelineMetric(timelineMetric); +// LOG.info("Emitting metric with appId = " + timelineMetric.getAppId()); +// } +// try { +// LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series."); +// metricStore.putMetrics(timelineMetrics); +// } catch (Exception e) { +// LOG.error(e); +// } +// } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java new file mode 100644 index 0000000..d7db9ca --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype.testing.utilities; + +import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; +import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; + +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Collections; +import java.util.Map; + +@XmlRootElement +public class TestSeriesInputRequest { + + private String seriesName; + private String seriesType; + private Map<String, String> configs; + + public TestSeriesInputRequest() { + } + + public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) { + this.seriesName = seriesName; + this.seriesType = seriesType; + this.configs = configs; + } + + public String getSeriesName() { + return seriesName; + } + + public void setSeriesName(String seriesName) { + this.seriesName = seriesName; + } + + public String getSeriesType() { + return seriesType; + } + + public void setSeriesType(String seriesType) { + this.seriesType = seriesType; + } + + public Map<String, String> getConfigs() { + return configs; + } + + public void setConfigs(Map<String, String> configs) { + this.configs = configs; + } + + @Override + public boolean equals(Object o) { + TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o; + return anotherInput.getSeriesName().equals(this.getSeriesName()); + } + + @Override + public int hashCode() { + return seriesName.hashCode(); + } + + public static void main(String[] args) { + + ObjectMapper objectMapper = new ObjectMapper(); + TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value")); + try { + System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R new file mode 100644 index 0000000..0b66095 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# EMA <- w * EMA + (1 - w) * x +# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 ) +# Alarm = abs(x - EMA) > n * EMS + +ema_global <- function(train_data, test_data, w, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + ema <- 0 + ems <- 0 + + #Train Step + for (x in train_data) { + ema <- w*ema + (1-w)*x + ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) + } + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + if (abs(x - ema) > n*ems) { + anomaly <- c(as.numeric(test_data[i,1]), x) + # print (anomaly) + anomalies <- rbind(anomalies, anomaly) + } + ema <- w*ema + (1-w)*x + ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return (anomalies) +} + +ema_daily <- function(train_data, test_data, w, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + ema <- vector("numeric", 7) + ems <- vector("numeric", 7) + + #Train Step + for ( i in 1:length(train_data[,1])) { + x <- train_data[i,2] + time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") + index <- time$wday + ema[index] <- w*ema[index] + (1-w)*x + ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2) + } + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") + index <- time$wday + + if (abs(x - ema[index+1]) > n*ems[index+1]) { + anomaly <- c(as.numeric(test_data[i,1]), x) + # print (anomaly) + anomalies <- rbind(anomalies, anomaly) + } + ema[index+1] <- w*ema[index+1] + (1-w)*x + ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return(anomalies) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r new file mode 100644 index 0000000..bca3366 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) { + + #res <- get_data(url) + #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + #names(data) <- c("TS", res$metrics[[1]]$metricname) + anomalies <- data.frame() + + granularity <- train_data[2,1] - train_data[1,1] + test_start <- test_data[1,1] + test_end <- test_data[length(test_data[1,]),1] + train_start <- test_start - num_historic_periods*period + # round to start of day + train_start <- train_start - (train_start %% interval) + + time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT") + test_data_day <- time$wday + + h_data <- c() + for ( i in length(train_data[,1]):1) { + ts <- train_data[i,1] + if ( ts < train_start) { + break + } + time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT") + if (time$wday == test_data_day) { + x <- train_data[i,2] + h_data <- c(h_data, x) + } + } + + if (length(h_data) < 2*length(test_data[,1])) { + cat ("\nNot enough training data") + return (anomalies) + } + + past_median <- median(h_data) + past_sd <- sd(h_data) + curr_median <- median(test_data[,2]) + + if (abs(curr_median - past_median) > n * past_sd) { + anomaly <- c(test_start, test_end, curr_median, past_median, past_sd) + anomalies <- rbind(anomalies, anomaly) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD") + } + + return (anomalies) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R new file mode 100644 index 0000000..8956400 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) { + + res <- get_data(url) + num_metrics <- length(res$metrics) + anomalies <- data.frame() + + metricname <- res$metrics[[1]]$metricname + data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + names(data) <- c("TS", res$metrics[[1]]$metricname) + + for (i in 2:num_metrics) { + metricname <- res$metrics[[i]]$metricname + df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics)) + names(df) <- c("TS", res$metrics[[i]]$metricname) + data <- merge(data, df) + } + + algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)] + iForest <- IsolationTrees(algo_data) + test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ] + + if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest) + for (i in 1:length(if_res$outF)) { + index <- test_start+i-1 + if (if_res$outF[i] > threshold_score) { + anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i]) + anomalies <- rbind(anomalies, anomaly) + } + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Anomaly Score", "Path length") + } + return (anomalies) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r new file mode 100644 index 0000000..f22bc15 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ams_ks <- function(train_data, test_data, p_value) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2] + + anomalies <- data.frame() + res <- ks.test(train_data[,2], test_data[,2]) + + if (res[2] < p_value) { + anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2]) + anomalies <- rbind(anomalies, anomaly) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS Start", "TS End", "D", "p-value") + } + return (anomalies) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R new file mode 100644 index 0000000..7650356 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +tukeys_anomalies <- data.frame() +ema_global_anomalies <- data.frame() +ema_daily_anomalies <- data.frame() +ks_anomalies <- data.frame() +hsdev_anomalies <- data.frame() + +init <- function() { + tukeys_anomalies <- data.frame() + ema_global_anomalies <- data.frame() + ema_daily_anomalies <- data.frame() + ks_anomalies <- data.frame() + hsdev_anomalies <- data.frame() +} + +test_methods <- function(data) { + + init() + #res <- get_data(url) + #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + #names(data) <- c("TS", res$metrics[[1]]$metricname) + + limit <- data[length(data[,1]),1] + step <- data[2,1] - data[1,1] + + train_start <- data[1,1] + train_end <- get_next_day_boundary(train_start, step, limit) + test_start <- train_end + step + test_end <- get_next_day_boundary(test_start, step, limit) + i <- 1 + day <- 24*60*60*1000 + + while (test_start < limit) { + + print (i) + i <- i + 1 + train_data <- data[which(data$TS >= train_start & data$TS <= train_end),] + test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3)) + #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3)) + #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3)) + #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05)) + hsdev_train_data <- data[which(data$TS < test_start),] + hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day)) + + train_start <- test_start + train_end <- get_next_day_boundary(train_start, step, limit) + test_start <- train_end + step + test_end <- get_next_day_boundary(test_start, step, limit) + } + return (hsdev_anomalies) +} + +get_next_day_boundary <- function(start, step, limit) { + + if (start > limit) { + return (-1) + } + + while (start <= limit) { + if (((start %% (24*60*60*1000)) - 28800000) == 0) { + return (start) + } + start <- start + step + } + return (start) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r new file mode 100644 index 0000000..0312226 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +ams_tukeys <- function(train_data, test_data, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + quantiles <- quantile(train_data[,2]) + iqr <- quantiles[4] - quantiles[2] + niqr <- 0 + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + lb <- quantiles[2] - n*iqr + ub <- quantiles[4] + n*iqr + if ( (x < lb) || (x > ub) ) { + if (iqr != 0) { + if (x < lb) { + niqr <- (quantiles[2] - x) / iqr + } else { + niqr <- (x - quantiles[4]) / iqr + } + } + anomaly <- c(test_data[i,1], x, niqr) + anomalies <- rbind(anomalies, anomaly) + } + } + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value", "niqr") + } + return (anomalies) +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties new file mode 100644 index 0000000..ab106c4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/input-config.properties @@ -0,0 +1,42 @@ +# Copyright 2011 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +appIds=HOST + +collectorHost=localhost +collectorPort=6188 +collectorProtocol=http + +zkQuorum=localhost:2181 + +ambariServerHost=localhost +clusterName=c1 + +emaW=0.8 +emaN=3 +tukeysN=3 +pointInTimeTestInterval=300000 +pointInTimeTrainInterval=900000 + +ksTestInterval=600000 +ksTrainInterval=600000 +hsdevNhp=3 +hsdevInterval=1800000; + +skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime +hosts=avijayan-ad-1.openstacklocal \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala new file mode 100644 index 0000000..6122f5e --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/MetricAnomalyDetector.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.spark.prototype + +import java.io.{FileInputStream, IOException, InputStream} +import java.util +import java.util.Properties +import java.util.logging.LogManager + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.ambari.metrics.adservice.prototype.core.MetricsCollectorInterface +import org.apache.spark.SparkConf +import org.apache.spark.streaming._ +import org.apache.spark.streaming.kafka._ +import org.apache.ambari.metrics.adservice.prototype.methods.{AnomalyDetectionTechnique, MetricAnomaly} +import org.apache.ambari.metrics.adservice.prototype.methods.ema.{EmaModelLoader, EmaTechnique} +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics +import org.apache.log4j.Logger +import org.apache.spark.storage.StorageLevel + +object MetricAnomalyDetector { + + /* + Load current EMA model + Filter step - Check if anomaly + Collect / Write to AMS / Print. + */ + +// var brokers = "avijayan-ams-1.openstacklocal:2181,avijayan-ams-2.openstacklocal:2181,avijayan-ams-3.openstacklocal:2181" +// var groupId = "ambari-metrics-group" +// var topicName = "ambari-metrics-topic" +// var numThreads = 1 +// val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = Array[AnomalyDetectionTechnique]() +// +// def readProperties(propertiesFile: String): Properties = try { +// val properties = new Properties +// var inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile) +// if (inputStream == null) inputStream = new FileInputStream(propertiesFile) +// properties.load(inputStream) +// properties +// } catch { +// case ioEx: IOException => +// null +// } +// +// def main(args: Array[String]): Unit = { +// +// @transient +// lazy val log = org.apache.log4j.LogManager.getLogger("MetricAnomalyDetectorLogger") +// +// if (args.length < 1) { +// System.err.println("Usage: MetricSparkConsumer <input-config-file>") +// System.exit(1) +// } +// +// //Read properties +// val properties = readProperties(propertiesFile = args(0)) +// +// //Load EMA parameters - w, n +// val emaW = properties.getProperty("emaW").toDouble +// val emaN = properties.getProperty("emaN").toDouble +// +// //collector info +// val collectorHost: String = properties.getProperty("collectorHost") +// val collectorPort: String = properties.getProperty("collectorPort") +// val collectorProtocol: String = properties.getProperty("collectorProtocol") +// val anomalyMetricPublisher = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort) +// +// //Instantiate Kafka stream reader +// val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector") +// val streamingContext = new StreamingContext(sparkConf, Duration(10000)) +// +// val topicsSet = topicName.toSet +// val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) +//// val stream = KafkaUtils.createDirectStream() +// +// val kafkaStream = KafkaUtils.createStream(streamingContext, zkQuorum, groupId, Map(topicName -> numThreads), StorageLevel.MEMORY_AND_DISK_SER_2) +// kafkaStream.print() +// +// var timelineMetricsStream = kafkaStream.map( message => { +// val mapper = new ObjectMapper +// val metrics = mapper.readValue(message._2, classOf[TimelineMetrics]) +// metrics +// }) +// timelineMetricsStream.print() +// +// var appMetricStream = timelineMetricsStream.map( timelineMetrics => { +// (timelineMetrics.getMetrics.get(0).getAppId, timelineMetrics) +// }) +// appMetricStream.print() +// +// var filteredAppMetricStream = appMetricStream.filter( appMetricTuple => { +// appIds.contains(appMetricTuple._1) +// } ) +// filteredAppMetricStream.print() +// +// filteredAppMetricStream.foreachRDD( rdd => { +// rdd.foreach( appMetricTuple => { +// val timelineMetrics = appMetricTuple._2 +// logger.info("Received Metric (1): " + timelineMetrics.getMetrics.get(0).getMetricName) +// log.info("Received Metric (2): " + timelineMetrics.getMetrics.get(0).getMetricName) +// for (timelineMetric <- timelineMetrics.getMetrics) { +// var anomalies = emaModel.test(timelineMetric) +// anomalyMetricPublisher.publish(anomalies) +// } +// }) +// }) +// +// streamingContext.start() +// streamingContext.awaitTermination() +// } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala new file mode 100644 index 0000000..6e1ae07 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.adservice.spark.prototype + +import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + +object SparkPhoenixReader { + + def main(args: Array[String]) { + + if (args.length < 6) { + System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") + System.exit(1) + } + + var metricName = args(0) + var appId = args(1) + var hostname = args(2) + var weight = args(3).toDouble + var timessdev = args(4).toInt + var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure + var modelDir = args(6) + + val conf = new SparkConf() + conf.set("spark.app.name", "AMSAnomalyModelBuilder") + //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") + + var sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + + val currentTime = System.currentTimeMillis() + val oneDayBack = currentTime - 24*60*60*1000 + + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) + df.registerTempTable("METRIC_RECORD") + val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + + "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) + + var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] + result.collect().foreach( + t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) + ) + + //val seriesName = result.head().getString(0) + //val hostname = result.head().getString(1) + //val appId = result.head().getString(2) + + val timelineMetric = new TimelineMetric() + timelineMetric.setMetricName(metricName) + timelineMetric.setAppId(appId) + timelineMetric.setHostName(hostname) + timelineMetric.setMetricValues(metricValues) + + var emaModel = new EmaTechnique(weight, timessdev) + emaModel.test(timelineMetric) + emaModel.save(sc, modelDir) + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java new file mode 100644 index 0000000..76a00a6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestEmaTechnique.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype; + +import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; +import java.util.TreeMap; + +import static org.apache.ambari.metrics.adservice.prototype.TestRFunctionInvoker.getTS; + +public class TestEmaTechnique { + + private static double[] ts; + private static String fullFilePath; + + @BeforeClass + public static void init() throws URISyntaxException { + + Assume.assumeTrue(System.getenv("R_HOME") != null); + ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + } + + @Test + public void testEmaInitialization() { + + EmaTechnique ema = new EmaTechnique(0.5, 3); + Assert.assertTrue(ema.getTrackedEmas().isEmpty()); + Assert.assertTrue(ema.getStartingWeight() == 0.5); + Assert.assertTrue(ema.getStartTimesSdev() == 2); + } + + @Test + public void testEma() { + EmaTechnique ema = new EmaTechnique(0.5, 3); + + long now = System.currentTimeMillis(); + + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("M1"); + metric1.setHostName("H1"); + metric1.setStartTime(now - 1000); + metric1.setAppId("A1"); + metric1.setInstanceId(null); + metric1.setType("Integer"); + + //Train + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + for (int i = 0; i < 50; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 100, metric); + } + metric1.setMetricValues(metricValues); + List<MetricAnomaly> anomalyList = ema.test(metric1); +// Assert.assertTrue(anomalyList.isEmpty()); + + metricValues = new TreeMap<Long, Double>(); + for (int i = 0; i < 50; i++) { + double metric = 20000 + Math.random(); + metricValues.put(now - i * 100, metric); + } + metric1.setMetricValues(metricValues); + anomalyList = ema.test(metric1); + Assert.assertTrue(!anomalyList.isEmpty()); + int l1 = anomalyList.size(); + + Assert.assertTrue(ema.updateModel(metric1, false, 20)); + anomalyList = ema.test(metric1); + int l2 = anomalyList.size(); + Assert.assertTrue(l2 < l1); + + Assert.assertTrue(ema.updateModel(metric1, true, 50)); + anomalyList = ema.test(metric1); + int l3 = anomalyList.size(); + Assert.assertTrue(l3 > l2 && l3 > l1); + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java new file mode 100644 index 0000000..98fa050 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/java/org/apache/ambari/metrics/adservice/prototype/TestRFunctionInvoker.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.adservice.prototype; + +import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; +import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; +import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; +import org.apache.ambari.metrics.adservice.seriesgenerator.UniformMetricSeries; +import org.apache.commons.lang.ArrayUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +public class TestRFunctionInvoker { + + private static String metricName = "TestMetric"; + private static double[] ts; + private static String fullFilePath; + + @BeforeClass + public static void init() throws URISyntaxException { + + Assume.assumeTrue(System.getenv("R_HOME") != null); + ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + } + + @Test + public void testTukeys() throws URISyntaxException { + + double[] train_ts = ArrayUtils.subarray(ts, 0, 750); + double[] train_x = getRandomData(750); + DataSeries trainData = new DataSeries(metricName, train_ts, train_x); + + double[] test_ts = ArrayUtils.subarray(ts, 750, 1000); + double[] test_x = getRandomData(250); + test_x[50] = 5.5; //Anomaly + DataSeries testData = new DataSeries(metricName, test_ts, test_x); + Map<String, String> configs = new HashMap(); + configs.put("tukeys.n", "3"); + + ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs); + Assert.assertEquals(rs.resultset.size(), 2); + Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1); + + } + + public static void main(String[] args) throws URISyntaxException { + + String metricName = "TestMetric"; + double[] ts = getTS(1000); + URL url = ClassLoader.getSystemResource("R-scripts"); + String fullFilePath = new File(url.toURI()).getAbsolutePath(); + RFunctionInvoker.setScriptsDir(fullFilePath); + + double[] train_ts = ArrayUtils.subarray(ts, 0, 750); + double[] train_x = getRandomData(750); + DataSeries trainData = new DataSeries(metricName, train_ts, train_x); + + double[] test_ts = ArrayUtils.subarray(ts, 750, 1000); + double[] test_x = getRandomData(250); + test_x[50] = 5.5; //Anomaly + DataSeries testData = new DataSeries(metricName, test_ts, test_x); + ResultSet rs; + + Map<String, String> configs = new HashMap(); + + System.out.println("TUKEYS"); + configs.put("tukeys.n", "3"); + rs = RFunctionInvoker.tukeys(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + +// System.out.println("EMA Global"); +// configs.put("ema.n", "3"); +// configs.put("ema.w", "0.8"); +// rs = RFunctionInvoker.ema_global(trainData, testData, configs); +// rs.print(); +// System.out.println("--------------"); +// +// System.out.println("EMA Daily"); +// rs = RFunctionInvoker.ema_daily(trainData, testData, configs); +// rs.print(); +// System.out.println("--------------"); +// +// configs.put("ks.p_value", "0.00005"); +// System.out.println("KS Test"); +// rs = RFunctionInvoker.ksTest(trainData, testData, configs); +// rs.print(); +// System.out.println("--------------"); +// + ts = getTS(5000); + train_ts = ArrayUtils.subarray(ts, 0, 4800); + train_x = getRandomData(4800); + trainData = new DataSeries(metricName, train_ts, train_x); + test_ts = ArrayUtils.subarray(ts, 4800, 5000); + test_x = getRandomData(200); + for (int i = 0; i < 200; i++) { + test_x[i] = test_x[i] * 5; + } + testData = new DataSeries(metricName, test_ts, test_x); + configs.put("hsdev.n", "3"); + configs.put("hsdev.nhp", "3"); + configs.put("hsdev.interval", "86400000"); + configs.put("hsdev.period", "604800000"); + System.out.println("HSdev"); + rs = RFunctionInvoker.hsdev(trainData, testData, configs); + rs.print(); + System.out.println("--------------"); + + } + + static double[] getTS(int n) { + long currentTime = System.currentTimeMillis(); + double[] ts = new double[n]; + currentTime = currentTime - (currentTime % (5 * 60 * 1000)); + + for (int i = 0, j = n - 1; i < n; i++, j--) { + ts[j] = currentTime; + currentTime = currentTime - (5 * 60 * 1000); + } + return ts; + } + + static double[] getRandomData(int n) { + + UniformMetricSeries metricSeries = new UniformMetricSeries(10, 0.1,0.05, 0.6, 0.8, true); + return metricSeries.getSeries(n); + +// double[] metrics = new double[n]; +// Random random = new Random(); +// for (int i = 0; i < n; i++) { +// metrics[i] = random.nextDouble(); +// } +// return metrics; + } +}
