http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java deleted file mode 100644 index 8f1eba6..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/RFunctionInvoker.java +++ /dev/null @@ -1,222 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.core; - - -import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.rosuda.JRI.REXP; -import org.rosuda.JRI.RVector; -import org.rosuda.JRI.Rengine; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class RFunctionInvoker { - - static final Log LOG = LogFactory.getLog(RFunctionInvoker.class); - public static Rengine r = new Rengine(new String[]{"--no-save"}, false, null); - private static String rScriptDir = "/usr/lib/ambari-metrics-collector/R-scripts"; - - private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries testData) { - r.assign("train_ts", trainData.ts); - r.assign("train_x", trainData.values); - r.eval("train_data <- data.frame(train_ts,train_x)"); - r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")"); - - r.assign("test_ts", testData.ts); - r.assign("test_x", testData.values); - r.eval("test_data <- data.frame(test_ts,test_x)"); - r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")"); - } - - public static void setScriptsDir(String dir) { - rScriptDir = dir; - } - - public static ResultSet executeMethod(String methodType, DataSeries trainData, DataSeries testData, Map<String, String> configs) { - - ResultSet result; - switch (methodType) { - case "tukeys": - result = tukeys(trainData, testData, configs); - break; - case "ema": - result = ema_global(trainData, testData, configs); - break; - case "ks": - result = ksTest(trainData, testData, configs); - break; - case "hsdev": - result = hsdev(trainData, testData, configs); - break; - default: - result = tukeys(trainData, testData, configs); - break; - } - return result; - } - - public static ResultSet tukeys(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - - REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')"); - - double n = Double.parseDouble(configs.get("tukeys.n")); - r.eval("n <- " + n); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ams_tukeys(train_data, test_data, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ema_global(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/ema.r" + "')"); - - int n = Integer.parseInt(configs.get("ema.n")); - r.eval("n <- " + n); - - double w = Double.parseDouble(configs.get("ema.w")); - r.eval("w <- " + w); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ema_global(train_data, test_data, w, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/ema.r" + "')"); - - int n = Integer.parseInt(configs.get("ema.n")); - r.eval("n <- " + n); - - double w = Double.parseDouble(configs.get("ema.w")); - r.eval("w <- " + w); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ema_daily(train_data, test_data, w, n)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet ksTest(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/kstest.r" + "')"); - - double p_value = Double.parseDouble(configs.get("ks.p_value")); - r.eval("p_value <- " + p_value); - - loadDataSets(r, trainData, testData); - - r.eval("an <- ams_ks(train_data, test_data, p_value)"); - REXP exp = r.eval("an"); - RVector cont = (RVector) exp.getContent(); - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } - - public static ResultSet hsdev(DataSeries trainData, DataSeries testData, Map<String, String> configs) { - try { - r.eval("source('" + rScriptDir + "/hsdev.r" + "')"); - - int n = Integer.parseInt(configs.get("hsdev.n")); - r.eval("n <- " + n); - - int nhp = Integer.parseInt(configs.get("hsdev.nhp")); - r.eval("nhp <- " + nhp); - - long interval = Long.parseLong(configs.get("hsdev.interval")); - r.eval("interval <- " + interval); - - long period = Long.parseLong(configs.get("hsdev.period")); - r.eval("period <- " + period); - - loadDataSets(r, trainData, testData); - - r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, period)"); - REXP exp = r.eval("an2"); - RVector cont = (RVector) exp.getContent(); - - List<double[]> result = new ArrayList(); - for (int i = 0; i < cont.size(); i++) { - result.add(cont.at(i).asDoubleArray()); - } - return new ResultSet(result); - } catch (Exception e) { - LOG.error(e); - } finally { - r.end(); - } - return null; - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java deleted file mode 100644 index 80212b3..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendADSystem.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.core; - -import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.adservice.prototype.methods.hsdev.HsdevTechnique; -import org.apache.ambari.metrics.adservice.prototype.methods.kstest.KSTechnique; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -public class TrendADSystem implements Serializable { - - private MetricsCollectorInterface metricsCollectorInterface; - private List<TrendMetric> trendMetrics; - - private long ksTestIntervalMillis = 10 * 60 * 1000; - private long ksTrainIntervalMillis = 10 * 60 * 1000; - private KSTechnique ksTechnique; - - private HsdevTechnique hsdevTechnique; - private int hsdevNumHistoricalPeriods = 3; - - private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new HashMap<>(); - private static final Log LOG = LogFactory.getLog(TrendADSystem.class); - private String inputFile = ""; - - public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface, - long ksTestIntervalMillis, - long ksTrainIntervalMillis, - int hsdevNumHistoricalPeriods) { - - this.metricsCollectorInterface = metricsCollectorInterface; - this.ksTestIntervalMillis = ksTestIntervalMillis; - this.ksTrainIntervalMillis = ksTrainIntervalMillis; - this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods; - - this.ksTechnique = new KSTechnique(); - this.hsdevTechnique = new HsdevTechnique(); - - trendMetrics = new ArrayList<>(); - } - - public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) { - readInputFile(inputFile); - - long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis; - LOG.info("Running KS Test for test data interval [" + new Date(ksTestIntervalStartTime) + " : " + - new Date(currentEndTime) + "], with train data period [" + new Date(ksTestIntervalStartTime - ksTrainIntervalMillis) - + " : " + new Date(ksTestIntervalStartTime) + "]"); - - for (TrendMetric metric : trendMetrics) { - String metricName = metric.metricName; - String appId = metric.appId; - String hostname = metric.hostname; - String key = metricName + ":" + appId + ":" + hostname; - - TimelineMetrics ksData = metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, ksTestIntervalStartTime - ksTrainIntervalMillis, - currentEndTime); - - if (ksData.getMetrics().isEmpty()) { - LOG.info("No metrics fetched for KS, metricKey = " + key); - continue; - } - - List<Double> trainTsList = new ArrayList<>(); - List<Double> trainDataList = new ArrayList<>(); - List<Double> testTsList = new ArrayList<>(); - List<Double> testDataList = new ArrayList<>(); - - for (TimelineMetric timelineMetric : ksData.getMetrics()) { - for (Long timestamp : timelineMetric.getMetricValues().keySet()) { - if (timestamp <= ksTestIntervalStartTime) { - trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); - trainTsList.add((double) timestamp); - } else { - testDataList.add(timelineMetric.getMetricValues().get(timestamp)); - testTsList.add((double) timestamp); - } - } - } - - LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size : " + testDataList.size()); - if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { - LOG.info("Not enough train/test data to perform KS analysis."); - continue; - } - - String ksTrainSeries = "KSTrainSeries"; - double[] trainTs = new double[trainTsList.size()]; - double[] trainData = new double[trainTsList.size()]; - for (int i = 0; i < trainTs.length; i++) { - trainTs[i] = trainTsList.get(i); - trainData[i] = trainDataList.get(i); - } - - String ksTestSeries = "KSTestSeries"; - double[] testTs = new double[testTsList.size()]; - double[] testData = new double[testTsList.size()]; - for (int i = 0; i < testTs.length; i++) { - testTs[i] = testTsList.get(i); - testData[i] = testDataList.get(i); - } - - LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); - - DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, trainData); - DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData); - - MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, ksTestData); - if (metricAnomaly == null) { - LOG.info("No anomaly from KS test."); - } else { - LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric...."); - TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly, - ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - ksTrainIntervalMillis, ksTestIntervalStartTime); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.addOrMergeTimelineMetric(timelineMetric); - metricsCollectorInterface.emitMetrics(timelineMetrics); - - trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, currentEndTime, metricName, appId, hostname), metricAnomaly); - } - } - - if (trendMetrics.isEmpty()) { - LOG.info("No Trend metrics tracked!!!!"); - } - - } - - private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly, - long testStart, - long testEnd, - long trainStart, - long trainEnd) { - - TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(metricAnomaly.getMetricKey()); - timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + metricAnomaly.getMethodType()); - timelineMetric.setInstanceId(null); - timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); - timelineMetric.setStartTime(testEnd); - HashMap<String, String> metadata = new HashMap<>(); - metadata.put("method", metricAnomaly.getMethodType()); - metadata.put("anomaly-score", String.valueOf(metricAnomaly.getAnomalyScore())); - metadata.put("test-start-time", String.valueOf(testStart)); - metadata.put("train-start-time", String.valueOf(trainStart)); - metadata.put("train-end-time", String.valueOf(trainEnd)); - timelineMetric.setMetadata(metadata); - TreeMap<Long,Double> metricValues = new TreeMap<>(); - metricValues.put(testEnd, metricAnomaly.getMetricValue()); - timelineMetric.setMetricValues(metricValues); - return timelineMetric; - - } - - public void runHsdevMethod() { - - List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>(); - - for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) { - - long hsdevTestEnd = ksSingleRunKey.endTime; - long hsdevTestStart = ksSingleRunKey.startTime; - - long period = hsdevTestEnd - hsdevTestStart; - - long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * period; - long hsdevTrainEnd = hsdevTestStart; - - LOG.info("Running HSdev Test for test data interval [" + new Date(hsdevTestStart) + " : " + - new Date(hsdevTestEnd) + "], with train data period [" + new Date(hsdevTrainStart) - + " : " + new Date(hsdevTrainEnd) + "]"); - - String metricName = ksSingleRunKey.metricName; - String appId = ksSingleRunKey.appId; - String hostname = ksSingleRunKey.hostname; - String key = metricName + "_" + appId + "_" + hostname; - - TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics( - metricName, - appId, - hostname, - hsdevTrainStart, - hsdevTestEnd); - - if (hsdevData.getMetrics().isEmpty()) { - LOG.info("No metrics fetched for HSDev, metricKey = " + key); - continue; - } - - List<Double> trainTsList = new ArrayList<>(); - List<Double> trainDataList = new ArrayList<>(); - List<Double> testTsList = new ArrayList<>(); - List<Double> testDataList = new ArrayList<>(); - - for (TimelineMetric timelineMetric : hsdevData.getMetrics()) { - for (Long timestamp : timelineMetric.getMetricValues().keySet()) { - if (timestamp <= hsdevTestStart) { - trainDataList.add(timelineMetric.getMetricValues().get(timestamp)); - trainTsList.add((double) timestamp); - } else { - testDataList.add(timelineMetric.getMetricValues().get(timestamp)); - testTsList.add((double) timestamp); - } - } - } - - if (trainDataList.isEmpty() || testDataList.isEmpty() || trainDataList.size() < testDataList.size()) { - LOG.info("Not enough train/test data to perform Hsdev analysis."); - continue; - } - - String hsdevTrainSeries = "HsdevTrainSeries"; - double[] trainTs = new double[trainTsList.size()]; - double[] trainData = new double[trainTsList.size()]; - for (int i = 0; i < trainTs.length; i++) { - trainTs[i] = trainTsList.get(i); - trainData[i] = trainDataList.get(i); - } - - String hsdevTestSeries = "HsdevTestSeries"; - double[] testTs = new double[testTsList.size()]; - double[] testData = new double[testTsList.size()]; - for (int i = 0; i < testTs.length; i++) { - testTs[i] = testTsList.get(i); - testData[i] = testDataList.get(i); - } - - LOG.info("Train Size = " + trainTs.length + ", Test Size = " + testTs.length); - - DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, trainData); - DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, testData); - - MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, hsdevTrainData, hsdevTestData); - if (metricAnomaly == null) { - LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. "); - ksTechnique.updateModel(key, false, 10); - } else { - LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly."); - hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly, - hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd)); - } - } - clearTrackedKsRunKeys(); - - if (!hsdevMetricAnomalies.isEmpty()) { - LOG.info("Publishing Hsdev Anomalies...."); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(hsdevMetricAnomalies); - metricsCollectorInterface.emitMetrics(timelineMetrics); - } - } - - private void clearTrackedKsRunKeys() { - trackedKsAnomalies.clear(); - } - - private void readInputFile(String fileName) { - trendMetrics.clear(); - try (BufferedReader br = new BufferedReader(new FileReader(fileName))) { - for (String line; (line = br.readLine()) != null; ) { - String[] splits = line.split(","); - LOG.info("Adding a new metric to track in Trend AD system : " + splits[0]); - trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2])); - } - } catch (IOException e) { - LOG.error("Error reading input file : " + e); - } - } - - class KsSingleRunKey implements Serializable{ - - long startTime; - long endTime; - String metricName; - String appId; - String hostname; - - public KsSingleRunKey(long startTime, long endTime, String metricName, String appId, String hostname) { - this.startTime = startTime; - this.endTime = endTime; - this.metricName = metricName; - this.appId = appId; - this.hostname = hostname; - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java deleted file mode 100644 index d4db227..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/core/TrendMetric.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.core; - -import java.io.Serializable; - -public class TrendMetric implements Serializable { - - String metricName; - String appId; - String hostname; - - public TrendMetric(String metricName, String appId, String hostname) { - this.metricName = metricName; - this.appId = appId; - this.hostname = hostname; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java deleted file mode 100644 index c19adda..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/AnomalyDetectionTechnique.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.methods; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -import java.util.List; - -public abstract class AnomalyDetectionTechnique { - - protected String methodType; - - public abstract List<MetricAnomaly> test(TimelineMetric metric); - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java deleted file mode 100644 index 60ff11c..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/MetricAnomaly.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.methods; - -import java.io.Serializable; - -public class MetricAnomaly implements Serializable{ - - private String methodType; - private double anomalyScore; - private String metricKey; - private long timestamp; - private double metricValue; - - - public MetricAnomaly(String metricKey, long timestamp, double metricValue, String methodType, double anomalyScore) { - this.metricKey = metricKey; - this.timestamp = timestamp; - this.metricValue = metricValue; - this.methodType = methodType; - this.anomalyScore = anomalyScore; - - } - - public String getMethodType() { - return methodType; - } - - public void setMethodType(String methodType) { - this.methodType = methodType; - } - - public double getAnomalyScore() { - return anomalyScore; - } - - public void setAnomalyScore(double anomalyScore) { - this.anomalyScore = anomalyScore; - } - - public void setMetricKey(String metricKey) { - this.metricKey = metricKey; - } - - public String getMetricKey() { - return metricKey; - } - - public void setMetricName(String metricName) { - this.metricKey = metricName; - } - - public long getTimestamp() { - return timestamp; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } - - public double getMetricValue() { - return metricValue; - } - - public void setMetricValue(double metricValue) { - this.metricValue = metricValue; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java deleted file mode 100644 index 593028e..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModel.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.methods.ema; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import javax.xml.bind.annotation.XmlRootElement; -import java.io.Serializable; - -import static org.apache.ambari.metrics.adservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold; - -@XmlRootElement -public class EmaModel implements Serializable { - - private String metricName; - private String hostname; - private String appId; - private double ema; - private double ems; - private double weight; - private double timessdev; - - private int ctr = 0; - - private static final Log LOG = LogFactory.getLog(EmaModel.class); - - public EmaModel(String name, String hostname, String appId, double weight, double timessdev) { - this.metricName = name; - this.hostname = hostname; - this.appId = appId; - this.weight = weight; - this.timessdev = timessdev; - this.ema = 0.0; - this.ems = 0.0; - } - - public String getMetricName() { - return metricName; - } - - public String getHostname() { - return hostname; - } - - public String getAppId() { - return appId; - } - - public double testAndUpdate(double metricValue) { - - double anomalyScore = 0.0; - LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev); - update(metricValue); - if (ctr > suppressAnomaliesTheshold) { - anomalyScore = test(metricValue); - if (anomalyScore > 0.0) { - LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + - ", timessdev = " + timessdev + ", metricValue = " + metricValue); - } else { - LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + hostname + " - " + "ema = " + ema + ", ems = " + ems + - ", timessdev = " + timessdev + ", metricValue = " + metricValue); - } - } else { - ctr++; - if (ctr > suppressAnomaliesTheshold) { - LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname + " is ready for testing data."); - } - } - return anomalyScore; - } - - public void update(double metricValue) { - ema = weight * ema + (1 - weight) * metricValue; - ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * Math.pow(metricValue - ema, 2.0)); - LOG.debug("In update : ema = " + ema + ", ems = " + ems); - } - - public double test(double metricValue) { - LOG.debug("In test : ema = " + ema + ", ems = " + ems); - double diff = Math.abs(ema - metricValue) - (timessdev * ems); - LOG.debug("diff = " + diff); - if (diff > 0) { - return Math.abs((metricValue - ema) / ems); //Z score - } else { - return 0.0; - } - } - - public void updateModel(boolean increaseSensitivity, double percent) { - LOG.info("Updating model for " + metricName + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); - double delta = percent / 100; - if (increaseSensitivity) { - delta = delta * -1; - } - this.timessdev = timessdev + delta * timessdev; - //this.weight = Math.min(1.0, weight + delta * weight); - LOG.info("New model parameters " + metricName + " : timessdev = " + timessdev + ", weight = " + weight); - } - - public double getWeight() { - return weight; - } - - public void setWeight(double weight) { - this.weight = weight; - } - - public double getTimessdev() { - return timessdev; - } - - public void setTimessdev(double timessdev) { - this.timessdev = timessdev; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java deleted file mode 100644 index 7623f27..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaModelLoader.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.methods.ema; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.spark.SparkContext; -import org.apache.spark.mllib.util.Loader; - -public class EmaModelLoader implements Loader<EmaTechnique> { - private static final Log LOG = LogFactory.getLog(EmaModelLoader.class); - - @Override - public EmaTechnique load(SparkContext sc, String path) { - return new EmaTechnique(0.5,3); -// Gson gson = new Gson(); -// try { -// String fileString = new String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8); -// return gson.fromJson(fileString, EmaTechnique.class); -// } catch (IOException e) { -// LOG.error(e); -// } -// return null; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java deleted file mode 100644 index 7ec17d8..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/ema/EmaTechnique.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.methods.ema; - -import com.google.gson.Gson; -import org.apache.ambari.metrics.adservice.prototype.methods.AnomalyDetectionTechnique; -import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.spark.SparkContext; -import org.apache.spark.mllib.util.Saveable; - -import javax.xml.bind.annotation.XmlElement; -import javax.xml.bind.annotation.XmlRootElement; -import java.io.BufferedWriter; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.io.Writer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -@XmlRootElement -public class EmaTechnique extends AnomalyDetectionTechnique implements Serializable, Saveable { - - @XmlElement(name = "trackedEmas") - private Map<String, EmaModel> trackedEmas; - private static final Log LOG = LogFactory.getLog(EmaTechnique.class); - - private double startingWeight = 0.5; - private double startTimesSdev = 3.0; - private String methodType = "ema"; - public static int suppressAnomaliesTheshold = 100; - - public EmaTechnique(double startingWeight, double startTimesSdev, int suppressAnomaliesTheshold) { - trackedEmas = new HashMap<>(); - this.startingWeight = startingWeight; - this.startTimesSdev = startTimesSdev; - EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold; - LOG.info("New EmaTechnique......"); - } - - public EmaTechnique(double startingWeight, double startTimesSdev) { - trackedEmas = new HashMap<>(); - this.startingWeight = startingWeight; - this.startTimesSdev = startTimesSdev; - LOG.info("New EmaTechnique......"); - } - - public List<MetricAnomaly> test(TimelineMetric metric) { - String metricName = metric.getMetricName(); - String appId = metric.getAppId(); - String hostname = metric.getHostName(); - String key = metricName + ":" + appId + ":" + hostname; - - EmaModel emaModel = trackedEmas.get(key); - if (emaModel == null) { - LOG.debug("EmaModel not present for " + key); - LOG.debug("Number of tracked Emas : " + trackedEmas.size()); - emaModel = new EmaModel(metricName, hostname, appId, startingWeight, startTimesSdev); - trackedEmas.put(key, emaModel); - } else { - LOG.debug("EmaModel already present for " + key); - } - - List<MetricAnomaly> anomalies = new ArrayList<>(); - - for (Long timestamp : metric.getMetricValues().keySet()) { - double metricValue = metric.getMetricValues().get(timestamp); - double anomalyScore = emaModel.testAndUpdate(metricValue); - if (anomalyScore > 0.0) { - LOG.info("Found anomaly for : " + key + ", anomalyScore = " + anomalyScore); - MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, metricValue, methodType, anomalyScore); - anomalies.add(metricAnomaly); - } else { - LOG.debug("Discarding non-anomaly for : " + key); - } - } - return anomalies; - } - - public boolean updateModel(TimelineMetric timelineMetric, boolean increaseSensitivity, double percent) { - String metricName = timelineMetric.getMetricName(); - String appId = timelineMetric.getAppId(); - String hostname = timelineMetric.getHostName(); - String key = metricName + "_" + appId + "_" + hostname; - - - EmaModel emaModel = trackedEmas.get(key); - - if (emaModel == null) { - LOG.warn("EMA Model for " + key + " not found"); - return false; - } - emaModel.updateModel(increaseSensitivity, percent); - - return true; - } - - @Override - public void save(SparkContext sc, String path) { - Gson gson = new Gson(); - try { - String json = gson.toJson(this); - try (Writer writer = new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(path), "utf-8"))) { - writer.write(json); - } - } catch (IOException e) { - LOG.error(e); - } - } - - @Override - public String formatVersion() { - return "1.0"; - } - - public Map<String, EmaModel> getTrackedEmas() { - return trackedEmas; - } - - public double getStartingWeight() { - return startingWeight; - } - - public double getStartTimesSdev() { - return startTimesSdev; - } - -} - http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java deleted file mode 100644 index 855cc70..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/hsdev/HsdevTechnique.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.methods.hsdev; - -import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.Serializable; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.median; -import static org.apache.ambari.metrics.adservice.prototype.common.StatisticUtils.sdev; - -public class HsdevTechnique implements Serializable { - - private Map<String, Double> hsdevMap; - private String methodType = "hsdev"; - private static final Log LOG = LogFactory.getLog(HsdevTechnique.class); - - public HsdevTechnique() { - hsdevMap = new HashMap<>(); - } - - public MetricAnomaly runHsdevTest(String key, DataSeries trainData, DataSeries testData) { - int testLength = testData.values.length; - int trainLength = trainData.values.length; - - if (trainLength < testLength) { - LOG.info("Not enough train data."); - return null; - } - - if (!hsdevMap.containsKey(key)) { - hsdevMap.put(key, 3.0); - } - - double n = hsdevMap.get(key); - - double historicSd = sdev(trainData.values, false); - double historicMedian = median(trainData.values); - double currentMedian = median(testData.values); - - - if (historicSd > 0) { - double diff = Math.abs(currentMedian - historicMedian); - LOG.info("Found anomaly for metric : " + key + " in the period ending " + new Date((long)testData.ts[testLength - 1])); - LOG.info("Current median = " + currentMedian + ", Historic Median = " + historicMedian + ", HistoricSd = " + historicSd); - - if (diff > n * historicSd) { - double zScore = diff / historicSd; - LOG.info("Z Score of current series : " + zScore); - return new MetricAnomaly(key, - (long) testData.ts[testLength - 1], - testData.values[testLength - 1], - methodType, - zScore); - } - } - - return null; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java deleted file mode 100644 index 0dc679e..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/methods/kstest/KSTechnique.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.adservice.prototype.methods.kstest; - -import org.apache.ambari.metrics.adservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.adservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.adservice.prototype.core.RFunctionInvoker; -import org.apache.ambari.metrics.adservice.prototype.methods.MetricAnomaly; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -public class KSTechnique implements Serializable { - - private String methodType = "ks"; - private Map<String, Double> pValueMap; - private static final Log LOG = LogFactory.getLog(KSTechnique.class); - - public KSTechnique() { - pValueMap = new HashMap(); - } - - public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries testData) { - - int testLength = testData.values.length; - int trainLength = trainData.values.length; - - if (trainLength < testLength) { - LOG.info("Not enough train data."); - return null; - } - - if (!pValueMap.containsKey(key)) { - pValueMap.put(key, 0.05); - } - double pValue = pValueMap.get(key); - - ResultSet result = RFunctionInvoker.ksTest(trainData, testData, Collections.singletonMap("ks.p_value", String.valueOf(pValue))); - if (result == null) { - LOG.error("Resultset is null when invoking KS R function..."); - return null; - } - - if (result.resultset.size() > 0) { - - LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length); - LOG.info("p_value = " + result.resultset.get(3)[0]); - double dValue = result.resultset.get(2)[0]; - - return new MetricAnomaly(key, - (long) testData.ts[testLength - 1], - testData.values[testLength - 1], - methodType, - dValue); - } - - return null; - } - - public void updateModel(String metricKey, boolean increaseSensitivity, double percent) { - - LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity = " + increaseSensitivity + ", percent = " + percent); - - if (!pValueMap.containsKey(metricKey)) { - LOG.error("Unknown metric key : " + metricKey); - LOG.info("pValueMap :" + pValueMap.toString()); - return; - } - - double delta = percent / 100; - if (!increaseSensitivity) { - delta = delta * -1; - } - - double pValue = pValueMap.get(metricKey); - double newPValue = Math.min(1.0, pValue + delta * pValue); - pValueMap.put(metricKey, newPValue); - LOG.info("New pValue = " + newPValue); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java deleted file mode 100644 index 9a002a1..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.adservice.prototype.testing.utilities; - -import javax.xml.bind.annotation.XmlRootElement; -import java.util.List; -import java.util.Map; - -@XmlRootElement -public class MetricAnomalyDetectorTestInput { - - public MetricAnomalyDetectorTestInput() { - } - - //Train data - private String trainDataName; - private String trainDataType; - private Map<String, String> trainDataConfigs; - private int trainDataSize; - - //Test data - private String testDataName; - private String testDataType; - private Map<String, String> testDataConfigs; - private int testDataSize; - - //Algorithm data - private List<String> methods; - private Map<String, String> methodConfigs; - - public String getTrainDataName() { - return trainDataName; - } - - public void setTrainDataName(String trainDataName) { - this.trainDataName = trainDataName; - } - - public String getTrainDataType() { - return trainDataType; - } - - public void setTrainDataType(String trainDataType) { - this.trainDataType = trainDataType; - } - - public Map<String, String> getTrainDataConfigs() { - return trainDataConfigs; - } - - public void setTrainDataConfigs(Map<String, String> trainDataConfigs) { - this.trainDataConfigs = trainDataConfigs; - } - - public String getTestDataName() { - return testDataName; - } - - public void setTestDataName(String testDataName) { - this.testDataName = testDataName; - } - - public String getTestDataType() { - return testDataType; - } - - public void setTestDataType(String testDataType) { - this.testDataType = testDataType; - } - - public Map<String, String> getTestDataConfigs() { - return testDataConfigs; - } - - public void setTestDataConfigs(Map<String, String> testDataConfigs) { - this.testDataConfigs = testDataConfigs; - } - - public Map<String, String> getMethodConfigs() { - return methodConfigs; - } - - public void setMethodConfigs(Map<String, String> methodConfigs) { - this.methodConfigs = methodConfigs; - } - - public int getTrainDataSize() { - return trainDataSize; - } - - public void setTrainDataSize(int trainDataSize) { - this.trainDataSize = trainDataSize; - } - - public int getTestDataSize() { - return testDataSize; - } - - public void setTestDataSize(int testDataSize) { - this.testDataSize = testDataSize; - } - - public List<String> getMethods() { - return methods; - } - - public void setMethods(List<String> methods) { - this.methods = methods; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java deleted file mode 100644 index 10b3a71..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/MetricAnomalyTester.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.metrics.adservice.prototype.testing.utilities; - -/** - * Class which was originally used to send test series from AMS to Spark through Kafka. - */ -public class MetricAnomalyTester { - -// public static String appId = MetricsCollectorInterface.serviceName; -// static final Log LOG = LogFactory.getLog(MetricAnomalyTester.class); -// static Map<String, TimelineMetric> timelineMetricMap = new HashMap<>(); -// -// public static TimelineMetrics runTestAnomalyRequest(MetricAnomalyDetectorTestInput input) throws UnknownHostException { -// -// long currentTime = System.currentTimeMillis(); -// TimelineMetrics timelineMetrics = new TimelineMetrics(); -// String hostname = InetAddress.getLocalHost().getHostName(); -// -// //Train data -// TimelineMetric metric1 = new TimelineMetric(); -// if (StringUtils.isNotEmpty(input.getTrainDataName())) { -// metric1 = timelineMetricMap.get(input.getTrainDataName()); -// if (metric1 == null) { -// metric1 = new TimelineMetric(); -// double[] trainSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTrainDataType(), input.getTrainDataSize(), input.getTrainDataConfigs()); -// metric1.setMetricName(input.getTrainDataName()); -// metric1.setAppId(appId); -// metric1.setHostName(hostname); -// metric1.setStartTime(currentTime); -// metric1.setInstanceId(null); -// metric1.setMetricValues(getAsTimeSeries(currentTime, trainSeries)); -// timelineMetricMap.put(input.getTrainDataName(), metric1); -// } -// timelineMetrics.getMetrics().add(metric1); -// } else { -// LOG.error("No train data name specified"); -// } -// -// //Test data -// TimelineMetric metric2 = new TimelineMetric(); -// if (StringUtils.isNotEmpty(input.getTestDataName())) { -// metric2 = timelineMetricMap.get(input.getTestDataName()); -// if (metric2 == null) { -// metric2 = new TimelineMetric(); -// double[] testSeries = MetricSeriesGeneratorFactory.generateSeries(input.getTestDataType(), input.getTestDataSize(), input.getTestDataConfigs()); -// metric2.setMetricName(input.getTestDataName()); -// metric2.setAppId(appId); -// metric2.setHostName(hostname); -// metric2.setStartTime(currentTime); -// metric2.setInstanceId(null); -// metric2.setMetricValues(getAsTimeSeries(currentTime, testSeries)); -// timelineMetricMap.put(input.getTestDataName(), metric2); -// } -// timelineMetrics.getMetrics().add(metric2); -// } else { -// LOG.warn("No test data name specified"); -// } -// -// //Invoke method -// if (CollectionUtils.isNotEmpty(input.getMethods())) { -// RFunctionInvoker.setScriptsDir("/etc/ambari-metrics-collector/conf/R-scripts"); -// for (String methodType : input.getMethods()) { -// ResultSet result = RFunctionInvoker.executeMethod(methodType, getAsDataSeries(metric1), getAsDataSeries(metric2), input.getMethodConfigs()); -// TimelineMetric timelineMetric = getAsTimelineMetric(result, methodType, input, currentTime, hostname); -// if (timelineMetric != null) { -// timelineMetrics.getMetrics().add(timelineMetric); -// } -// } -// } else { -// LOG.warn("No anomaly method requested"); -// } -// -// return timelineMetrics; -// } -// -// -// private static TimelineMetric getAsTimelineMetric(ResultSet result, String methodType, MetricAnomalyDetectorTestInput input, long currentTime, String hostname) { -// -// if (result == null) { -// return null; -// } -// -// TimelineMetric timelineMetric = new TimelineMetric(); -// if (methodType.equals("tukeys") || methodType.equals("ema")) { -// timelineMetric.setMetricName(input.getTrainDataName() + "_" + input.getTestDataName() + "_" + methodType + "_" + currentTime); -// timelineMetric.setHostName(hostname); -// timelineMetric.setAppId(appId); -// timelineMetric.setInstanceId(null); -// timelineMetric.setStartTime(currentTime); -// -// TreeMap<Long, Double> metricValues = new TreeMap<>(); -// if (result.resultset.size() > 0) { -// double[] ts = result.resultset.get(0); -// double[] metrics = result.resultset.get(1); -// for (int i = 0; i < ts.length; i++) { -// if (i == 0) { -// timelineMetric.setStartTime((long) ts[i]); -// } -// metricValues.put((long) ts[i], metrics[i]); -// } -// } -// timelineMetric.setMetricValues(metricValues); -// return timelineMetric; -// } -// return null; -// } -// -// -// private static TreeMap<Long, Double> getAsTimeSeries(long currentTime, double[] values) { -// -// long startTime = currentTime - (values.length - 1) * 60 * 1000; -// TreeMap<Long, Double> metricValues = new TreeMap<>(); -// -// for (int i = 0; i < values.length; i++) { -// metricValues.put(startTime, values[i]); -// startTime += (60 * 1000); -// } -// return metricValues; -// } -// -// private static DataSeries getAsDataSeries(TimelineMetric timelineMetric) { -// -// TreeMap<Long, Double> metricValues = timelineMetric.getMetricValues(); -// double[] timestamps = new double[metricValues.size()]; -// double[] values = new double[metricValues.size()]; -// int i = 0; -// -// for (Long timestamp : metricValues.keySet()) { -// timestamps[i] = timestamp; -// values[i++] = metricValues.get(timestamp); -// } -// return new DataSeries(timelineMetric.getMetricName() + "_" + timelineMetric.getAppId() + "_" + timelineMetric.getHostName(), timestamps, values); -// } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java deleted file mode 100644 index 3b2605b..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestMetricSeriesGenerator.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.testing.utilities; - -/** - * Class which was originally used to send test series from AMS to Spark through Kafka. - */ - -public class TestMetricSeriesGenerator { - //implements Runnable { - -// private Map<TestSeriesInputRequest, AbstractMetricSeries> configuredSeries = new HashMap<>(); -// private static final Log LOG = LogFactory.getLog(TestMetricSeriesGenerator.class); -// private TimelineMetricStore metricStore; -// private String hostname; -// -// public TestMetricSeriesGenerator(TimelineMetricStore metricStore) { -// this.metricStore = metricStore; -// try { -// this.hostname = InetAddress.getLocalHost().getHostName(); -// } catch (UnknownHostException e) { -// e.printStackTrace(); -// } -// } -// -// public void addSeries(TestSeriesInputRequest inputRequest) { -// if (!configuredSeries.containsKey(inputRequest)) { -// AbstractMetricSeries metricSeries = MetricSeriesGeneratorFactory.generateSeries(inputRequest.getSeriesType(), inputRequest.getConfigs()); -// configuredSeries.put(inputRequest, metricSeries); -// LOG.info("Added series " + inputRequest.getSeriesName()); -// } -// } -// -// public void removeSeries(String seriesName) { -// boolean isPresent = false; -// TestSeriesInputRequest tbd = null; -// for (TestSeriesInputRequest inputRequest : configuredSeries.keySet()) { -// if (inputRequest.getSeriesName().equals(seriesName)) { -// isPresent = true; -// tbd = inputRequest; -// } -// } -// if (isPresent) { -// LOG.info("Removing series " + seriesName); -// configuredSeries.remove(tbd); -// } else { -// LOG.info("Series not found : " + seriesName); -// } -// } -// -// @Override -// public void run() { -// long currentTime = System.currentTimeMillis(); -// TimelineMetrics timelineMetrics = new TimelineMetrics(); -// -// for (TestSeriesInputRequest input : configuredSeries.keySet()) { -// AbstractMetricSeries metricSeries = configuredSeries.get(input); -// TimelineMetric timelineMetric = new TimelineMetric(); -// timelineMetric.setMetricName(input.getSeriesName()); -// timelineMetric.setAppId("anomaly-engine-test-metric"); -// timelineMetric.setInstanceId(null); -// timelineMetric.setStartTime(currentTime); -// timelineMetric.setHostName(hostname); -// TreeMap<Long, Double> metricValues = new TreeMap(); -// metricValues.put(currentTime, metricSeries.nextValue()); -// timelineMetric.setMetricValues(metricValues); -// timelineMetrics.addOrMergeTimelineMetric(timelineMetric); -// LOG.info("Emitting metric with appId = " + timelineMetric.getAppId()); -// } -// try { -// LOG.info("Publishing test metrics for " + timelineMetrics.getMetrics().size() + " series."); -// metricStore.putMetrics(timelineMetrics); -// } catch (Exception e) { -// LOG.error(e); -// } -// } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java deleted file mode 100644 index d7db9ca..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/java/org/apache/ambari/metrics/adservice/prototype/testing/utilities/TestSeriesInputRequest.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.adservice.prototype.testing.utilities; - -import org.apache.htrace.fasterxml.jackson.core.JsonProcessingException; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; - -import javax.xml.bind.annotation.XmlRootElement; -import java.util.Collections; -import java.util.Map; - -@XmlRootElement -public class TestSeriesInputRequest { - - private String seriesName; - private String seriesType; - private Map<String, String> configs; - - public TestSeriesInputRequest() { - } - - public TestSeriesInputRequest(String seriesName, String seriesType, Map<String, String> configs) { - this.seriesName = seriesName; - this.seriesType = seriesType; - this.configs = configs; - } - - public String getSeriesName() { - return seriesName; - } - - public void setSeriesName(String seriesName) { - this.seriesName = seriesName; - } - - public String getSeriesType() { - return seriesType; - } - - public void setSeriesType(String seriesType) { - this.seriesType = seriesType; - } - - public Map<String, String> getConfigs() { - return configs; - } - - public void setConfigs(Map<String, String> configs) { - this.configs = configs; - } - - @Override - public boolean equals(Object o) { - TestSeriesInputRequest anotherInput = (TestSeriesInputRequest)o; - return anotherInput.getSeriesName().equals(this.getSeriesName()); - } - - @Override - public int hashCode() { - return seriesName.hashCode(); - } - - public static void main(String[] args) { - - ObjectMapper objectMapper = new ObjectMapper(); - TestSeriesInputRequest testSeriesInputRequest = new TestSeriesInputRequest("test", "ema", Collections.singletonMap("key","value")); - try { - System.out.print(objectMapper.writeValueAsString(testSeriesInputRequest)); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R deleted file mode 100644 index 0b66095..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/ema.R +++ /dev/null @@ -1,96 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# EMA <- w * EMA + (1 - w) * x -# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 ) -# Alarm = abs(x - EMA) > n * EMS - -ema_global <- function(train_data, test_data, w, n) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - anomalies <- data.frame() - ema <- 0 - ems <- 0 - - #Train Step - for (x in train_data) { - ema <- w*ema + (1-w)*x - ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) - } - - for ( i in 1:length(test_data[,1])) { - x <- test_data[i,2] - if (abs(x - ema) > n*ems) { - anomaly <- c(as.numeric(test_data[i,1]), x) - # print (anomaly) - anomalies <- rbind(anomalies, anomaly) - } - ema <- w*ema + (1-w)*x - ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value") - } - return (anomalies) -} - -ema_daily <- function(train_data, test_data, w, n) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - anomalies <- data.frame() - ema <- vector("numeric", 7) - ems <- vector("numeric", 7) - - #Train Step - for ( i in 1:length(train_data[,1])) { - x <- train_data[i,2] - time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") - index <- time$wday - ema[index] <- w*ema[index] + (1-w)*x - ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2) - } - - for ( i in 1:length(test_data[,1])) { - x <- test_data[i,2] - time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") - index <- time$wday - - if (abs(x - ema[index+1]) > n*ems[index+1]) { - anomaly <- c(as.numeric(test_data[i,1]), x) - # print (anomaly) - anomalies <- rbind(anomalies, anomaly) - } - ema[index+1] <- w*ema[index+1] + (1-w)*x - ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value") - } - return(anomalies) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r deleted file mode 100644 index bca3366..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/hsdev.r +++ /dev/null @@ -1,67 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) { - - #res <- get_data(url) - #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) - #names(data) <- c("TS", res$metrics[[1]]$metricname) - anomalies <- data.frame() - - granularity <- train_data[2,1] - train_data[1,1] - test_start <- test_data[1,1] - test_end <- test_data[length(test_data[1,]),1] - train_start <- test_start - num_historic_periods*period - # round to start of day - train_start <- train_start - (train_start %% interval) - - time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT") - test_data_day <- time$wday - - h_data <- c() - for ( i in length(train_data[,1]):1) { - ts <- train_data[i,1] - if ( ts < train_start) { - break - } - time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT") - if (time$wday == test_data_day) { - x <- train_data[i,2] - h_data <- c(h_data, x) - } - } - - if (length(h_data) < 2*length(test_data[,1])) { - cat ("\nNot enough training data") - return (anomalies) - } - - past_median <- median(h_data) - past_sd <- sd(h_data) - curr_median <- median(test_data[,2]) - - if (abs(curr_median - past_median) > n * past_sd) { - anomaly <- c(test_start, test_end, curr_median, past_median, past_sd) - anomalies <- rbind(anomalies, anomaly) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD") - } - - return (anomalies) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R deleted file mode 100644 index 8956400..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/iforest.R +++ /dev/null @@ -1,52 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) { - - res <- get_data(url) - num_metrics <- length(res$metrics) - anomalies <- data.frame() - - metricname <- res$metrics[[1]]$metricname - data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) - names(data) <- c("TS", res$metrics[[1]]$metricname) - - for (i in 2:num_metrics) { - metricname <- res$metrics[[i]]$metricname - df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics)) - names(df) <- c("TS", res$metrics[[i]]$metricname) - data <- merge(data, df) - } - - algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)] - iForest <- IsolationTrees(algo_data) - test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ] - - if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest) - for (i in 1:length(if_res$outF)) { - index <- test_start+i-1 - if (if_res$outF[i] > threshold_score) { - anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i]) - anomalies <- rbind(anomalies, anomaly) - } - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Anomaly Score", "Path length") - } - return (anomalies) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r deleted file mode 100644 index f22bc15..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/kstest.r +++ /dev/null @@ -1,38 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -ams_ks <- function(train_data, test_data, p_value) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2] - - anomalies <- data.frame() - res <- ks.test(train_data[,2], test_data[,2]) - - if (res[2] < p_value) { - anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2]) - anomalies <- rbind(anomalies, anomaly) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS Start", "TS End", "D", "p-value") - } - return (anomalies) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R deleted file mode 100644 index 7650356..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/test.R +++ /dev/null @@ -1,85 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -tukeys_anomalies <- data.frame() -ema_global_anomalies <- data.frame() -ema_daily_anomalies <- data.frame() -ks_anomalies <- data.frame() -hsdev_anomalies <- data.frame() - -init <- function() { - tukeys_anomalies <- data.frame() - ema_global_anomalies <- data.frame() - ema_daily_anomalies <- data.frame() - ks_anomalies <- data.frame() - hsdev_anomalies <- data.frame() -} - -test_methods <- function(data) { - - init() - #res <- get_data(url) - #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) - #names(data) <- c("TS", res$metrics[[1]]$metricname) - - limit <- data[length(data[,1]),1] - step <- data[2,1] - data[1,1] - - train_start <- data[1,1] - train_end <- get_next_day_boundary(train_start, step, limit) - test_start <- train_end + step - test_end <- get_next_day_boundary(test_start, step, limit) - i <- 1 - day <- 24*60*60*1000 - - while (test_start < limit) { - - print (i) - i <- i + 1 - train_data <- data[which(data$TS >= train_start & data$TS <= train_end),] - test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3)) - #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3)) - #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3)) - #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05)) - hsdev_train_data <- data[which(data$TS < test_start),] - hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day)) - - train_start <- test_start - train_end <- get_next_day_boundary(train_start, step, limit) - test_start <- train_end + step - test_end <- get_next_day_boundary(test_start, step, limit) - } - return (hsdev_anomalies) -} - -get_next_day_boundary <- function(start, step, limit) { - - if (start > limit) { - return (-1) - } - - while (start <= limit) { - if (((start %% (24*60*60*1000)) - 28800000) == 0) { - return (start) - } - start <- start + step - } - return (start) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/25c18121/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r deleted file mode 100644 index 0312226..0000000 --- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/R-scripts/tukeys.r +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -ams_tukeys <- function(train_data, test_data, n) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - anomalies <- data.frame() - quantiles <- quantile(train_data[,2]) - iqr <- quantiles[4] - quantiles[2] - niqr <- 0 - - for ( i in 1:length(test_data[,1])) { - x <- test_data[i,2] - lb <- quantiles[2] - n*iqr - ub <- quantiles[4] + n*iqr - if ( (x < lb) || (x > ub) ) { - if (iqr != 0) { - if (x < lb) { - niqr <- (quantiles[2] - x) / iqr - } else { - niqr <- (x - quantiles[4]) / iqr - } - } - anomaly <- c(test_data[i,1], x, niqr) - anomalies <- rbind(anomalies, anomaly) - } - } - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value", "niqr") - } - return (anomalies) -}